In R
customer = data.frame(CustomerId = c(1:6), Name = c("Angela", "Diana", "Cathy",
rep("Brandon", 2), "Eric"), State = c("CA", "TX", "IL", "AK", "CA", "AZ"))
print(customer)
## CustomerId Name State
## 1 1 Angela CA
## 2 2 Diana TX
## 3 3 Cathy IL
## 4 4 Brandon AK
## 5 5 Brandon CA
## 6 6 Eric AZ
product = data.frame(ProductId = c(1:4), Product = c("iPhone6", "iPhone6", "iWatch",
"MacBook"), Size = c(32, 64, NA, 256), Price = c(649, 749, 399, 1299))
print(product)
## ProductId Product Size Price
## 1 1 iPhone6 32 649
## 2 2 iPhone6 64 749
## 3 3 iWatch NA 399
## 4 4 MacBook 256 1299
purchase = data.frame(PurchaseId = c(1:7, 7), CustomerId = c(1, 2, 3, 3, 4,
4, 6, 6), ProductId = c(rep(1, 3), rep(2, 1), rep(3, 3), 1), Qty = c(1,
2, 3, 1, 2, 1, 1, 4))
print(purchase)
## PurchaseId CustomerId ProductId Qty
## 1 1 1 1 1
## 2 2 2 1 2
## 3 3 3 1 3
## 4 4 3 2 1
## 5 5 4 3 2
## 6 6 4 3 1
## 7 7 6 3 1
## 8 7 6 1 4
In Python
import numpy as np
import pandas as pd
from numpy import nan
CustomerId = range(1,7,1)
Name = ["Angela", "Diana", "Cathy", "Brandon", "Brandon", "Eric"]
State = ["CA", "TX", "IL", "AK", "CA", "AZ"]
customer = pd.DataFrame(np.column_stack([CustomerId, Name, State]), columns=['CustomerId', 'Name', 'State'])
print(customer)
ProductId = range(1,5,1)
Product = ["iPhone6", "iPhone6", "iWatch", "MacBook"]
Size = [32, 64, nan, 256]
Price = [649, 749, 399, 1299]
product = pd.DataFrame(np.column_stack([ProductId, Product, Size, Price]), columns=['ProductId', 'Product', 'Size', 'Price'])
product['ProductId'] = product['ProductId'].astype(int)
product['Size'] = product['Size'].astype(float)
product['Price'] = product['Price'].astype(float)
print(product)
PurchaseId = [1,2,3,4,5,6,7,7]
CustomerId = [1,2,3,3,4,4,6,6]
ProductId = [1,1,1,2,3,3,3,1]
Qty = [1,2,3,1,2,1,1,4]
purchase = pd.DataFrame(np.column_stack([PurchaseId, CustomerId, ProductId, Qty]), columns=['PurchaseId', 'CustomerId', 'ProductId', 'Qty'])
print(purchase)
## CustomerId Name State
## 0 1 Angela CA
## 1 2 Diana TX
## 2 3 Cathy IL
## 3 4 Brandon AK
## 4 5 Brandon CA
## 5 6 Eric AZ
## ProductId Product Size Price
## 0 1 iPhone6 32.0 649.0
## 1 2 iPhone6 64.0 749.0
## 2 3 iWatch NaN 399.0
## 3 4 MacBook 256.0 1299.0
## PurchaseId CustomerId ProductId Qty
## 0 1 1 1 1
## 1 2 2 1 2
## 2 3 3 1 3
## 3 4 3 2 1
## 4 5 4 3 2
## 5 6 4 3 1
## 6 7 6 3 1
## 7 7 6 1 4
In PySpark
customerDF = spark.createDataFrame(customer)
customerDF.show()
customerDF.cache()
productDF = spark.createDataFrame(product)
productDF.show()
productDF.cache()
purchaseDF = spark.createDataFrame(purchase)
purchaseDF.show()
purchaseDF.cache()
Summarize data (count, mean, standard devication, etc) by group
Find the number of models, average and maximum price by product.
In R
library(dplyr)
group_by(product, Product) %>% summarise(num_model=length(unique(Size)), mean_price=mean(Price), max_price=max(Price))
## # A tibble: 3 x 4
## Product num_model mean_price max_price
## <fct> <int> <dbl> <dbl>
## 1 iPhone6 2 699 749
## 2 iWatch 1 399 399
## 3 MacBook 1 1299 1299
doBy calculates the same summary functions for all selected variables or non group by numerical variables
library(doBy)
summaryBy(Size + Price ~ Product, data=product, FUN=list(l=length, avg=mean, max=max))
## Product Size.length Price.length Size.mean Price.mean Size.max Price.max
## 1 iPhone6 2 2 48 699 64 749
## 2 iWatch 1 1 NA 399 NA 399
## 3 MacBook 1 1 256 1299 256 1299
summaryBy(. ~ Product, data=product, FUN=list(l=length, m=mean, max=max))
## Product ProductId.length Size.length Price.length ProductId.mean
## 1 iPhone6 2 2 2 1.5
## 2 iWatch 1 1 1 3.0
## 3 MacBook 1 1 1 4.0
## Size.mean Price.mean ProductId.max Size.max Price.max
## 1 48 699 2 64 749
## 2 NA 399 3 NA 399
## 3 256 1299 4 256 1299
In Python
grouped = product.groupby('Product').agg({'Size':{'num_model':pd.Series.nunique}, 'Price':{'mean_price': 'mean', 'max_price': 'max'}})
print(grouped)
## C:\Users\tangkk\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\groupby\groupby.py:4656: FutureWarning: using a dict with renaming is deprecated and will be removed in a future version
## return super(DataFrameGroupBy, self).aggregate(arg, *args, **kwargs)
## Size Price
## num_model mean_price max_price
## Product
## MacBook 1.0 1299.0 1299.0
## iPhone6 2.0 699.0 749.0
## iWatch 0.0 399.0 399.0
grouped = product.groupby('Product').agg({'Size':pd.Series.nunique, 'Price':['mean', 'max']})
print(grouped)
## Size Price
## nunique mean max
## Product
## MacBook 1.0 1299.0 1299.0
## iPhone6 2.0 699.0 749.0
## iWatch 0.0 399.0 399.0
In PySpark
from pyspark.sql import functions as F
grouped = productDF.groupBy("Product").agg(F.countDistinct("Size").alias("num_model"), F.avg("Price").alias("mean_price"), F.max("Price").alias("max_price"))
grouped.show()
In R
arrange(customer, Name, desc(State))
## CustomerId Name State
## 1 1 Angela CA
## 2 5 Brandon CA
## 3 4 Brandon AK
## 4 3 Cathy IL
## 5 2 Diana TX
## 6 6 Eric AZ
In Python
print(customer.sort_values(by=['Name', 'State'], ascending=[1, 0]))
## CustomerId Name State
## 0 1 Angela CA
## 4 5 Brandon CA
## 3 4 Brandon AK
## 2 3 Cathy IL
## 1 2 Diana TX
## 5 6 Eric AZ
In PySpark
customerDF.orderBy(['Name', 'State'], ascending=[1, 0]).show()
In R
customer2 = data.frame(CustomerId=c(6:8), Name=c("Eric", "Frank", "George"), State=c("AZ" ,"ND", "SC"))
rbind(customer, customer2)
## CustomerId Name State
## 1 1 Angela CA
## 2 2 Diana TX
## 3 3 Cathy IL
## 4 4 Brandon AK
## 5 5 Brandon CA
## 6 6 Eric AZ
## 7 6 Eric AZ
## 8 7 Frank ND
## 9 8 George SC
In Python
CustomerId = range(6,9,1)
Name = ["Eric", "Frank", "George"]
State = ["AZ", "ND", "SC"]
customer2 = pd.DataFrame(np.column_stack([CustomerId, Name, State]), columns=['CustomerId', 'Name', 'State'])
pd.concat([customer, customer2])
## CustomerId Name State
## 0 1 Angela CA
## 1 2 Diana TX
## 2 3 Cathy IL
## 3 4 Brandon AK
## 4 5 Brandon CA
## 5 6 Eric AZ
## 0 6 Eric AZ
## 1 7 Frank ND
## 2 8 George SC
In PySpark
customer2DF = spark.createDataFrame(customer2)
customerDF.unionAll(customer2DF).collect()
or
sc.parallelize(customerDF.unionAll(customer2DF).collect()).toDF().show()
In R
merge(customer, customer2, by=colnames(customer2))
## CustomerId Name State
## 1 6 Eric AZ
In Python
pd.merge(customer, customer2, how='inner', on=['CustomerId', 'Name', 'State'])
## CustomerId Name State
## 0 6 Eric AZ
In PySpark
print (customerDF.intersect(customer2DF).collect())
sc.parallelize(customerDF.intersect(customer2DF).collect()).toDF().show()
Include the name and state of customer for each purhcase.
In R
inner_join(purchase, customer, by=c("CustomerId"))
## PurchaseId CustomerId ProductId Qty Name State
## 1 1 1 1 1 Angela CA
## 2 2 2 1 2 Diana TX
## 3 3 3 1 3 Cathy IL
## 4 4 3 2 1 Cathy IL
## 5 5 4 3 2 Brandon AK
## 6 6 4 3 1 Brandon AK
## 7 7 6 3 1 Eric AZ
## 8 7 6 1 4 Eric AZ
In Python
print(pd.merge(customer, customer2, how='inner', on=['CustomerId', 'Name', 'State']))
## CustomerId Name State
## 0 6 Eric AZ
In PySpark
customerDF.intersect(customer2DF).collect()
List all customers and their purchases.
In R
left_join(customer, purchase, by.x="CustomerId", by.y="CustomerId")
## CustomerId Name State PurchaseId ProductId Qty
## 1 1 Angela CA 1 1 1
## 2 2 Diana TX 2 1 2
## 3 3 Cathy IL 3 1 3
## 4 3 Cathy IL 4 2 1
## 5 4 Brandon AK 5 3 2
## 6 4 Brandon AK 6 3 1
## 7 5 Brandon CA NA NA NA
## 8 6 Eric AZ 7 3 1
## 9 6 Eric AZ 7 1 4
In Python
customer['CustomerId'] = customer['CustomerId'].astype(int)
purchase['CustomerId'] = purchase['CustomerId'].astype(int)
print(pd.merge(customer, purchase, how='left', on=['CustomerId']))
## CustomerId Name State PurchaseId ProductId Qty
## 0 1 Angela CA 1.0 1.0 1.0
## 1 2 Diana TX 2.0 1.0 2.0
## 2 3 Cathy IL 3.0 1.0 3.0
## 3 3 Cathy IL 4.0 2.0 1.0
## 4 4 Brandon AK 5.0 3.0 2.0
## 5 4 Brandon AK 6.0 3.0 1.0
## 6 5 Brandon CA NaN NaN NaN
## 7 6 Eric AZ 7.0 3.0 1.0
## 8 6 Eric AZ 7.0 1.0 4.0
In PySpark
purchaseDF = purchaseDF.withColumnRenamed('CustomerId', 'CustID')
result = customerDF.join(purchaseDF, (customerDF.CustomerId == purchaseDF.CustID), 'left')
result.orderBy('CustomerId').show(10)
List all products and purchases including them.
In R
right_join(purchase, product, by=c("ProductId"))
## PurchaseId CustomerId ProductId Qty Product Size Price
## 1 1 1 1 1 iPhone6 32 649
## 2 2 2 1 2 iPhone6 32 649
## 3 3 3 1 3 iPhone6 32 649
## 4 7 6 1 4 iPhone6 32 649
## 5 4 3 2 1 iPhone6 64 749
## 6 5 4 3 2 iWatch NA 399
## 7 6 4 3 1 iWatch NA 399
## 8 7 6 3 1 iWatch NA 399
## 9 NA NA 4 NA MacBook 256 1299
In Python
product['ProductId'] = product['ProductId'].astype(int)
purchase['ProductId'] = purchase['ProductId'].astype(int)
print(pd.merge(purchase, product, how='right', on='ProductId'))
## PurchaseId CustomerId ProductId Qty Product Size Price
## 0 1.0 1.0 1 1.0 iPhone6 32.0 649
## 1 2.0 2.0 1 2.0 iPhone6 32.0 649
## 2 3.0 3.0 1 3.0 iPhone6 32.0 649
## 3 7.0 6.0 1 4.0 iPhone6 32.0 649
## 4 4.0 3.0 2 1.0 iPhone6 64.0 749
## 5 5.0 4.0 3 2.0 iWatch nan 399
## 6 6.0 4.0 3 1.0 iWatch nan 399
## 7 7.0 6.0 3 1.0 iWatch nan 399
## 8 NaN NaN 4 NaN MacBook 256.0 1299
In PySpark
productDF = productDF.withColumnRenamed('ProductId', 'ProdId')
result = purchaseDF.join(productDF, (purchaseDF.ProductId == productDF.ProdId), 'right')
result.select('ProdId', 'PurchaseId', 'CustID', 'Qty', 'Product', 'Size', 'Price').orderBy('ProdId').show(10)
In R
full_join(customer, purchase)
## CustomerId Name State PurchaseId ProductId Qty
## 1 1 Angela CA 1 1 1
## 2 2 Diana TX 2 1 2
## 3 3 Cathy IL 3 1 3
## 4 3 Cathy IL 4 2 1
## 5 4 Brandon AK 5 3 2
## 6 4 Brandon AK 6 3 1
## 7 5 Brandon CA NA NA NA
## 8 6 Eric AZ 7 3 1
## 9 6 Eric AZ 7 1 4
In Python
print(pd.merge(customer, purchase, how='outer', on=['CustomerId']))
## CustomerId Name State PurchaseId ProductId Qty
## 0 1 Angela CA 1.0 1.0 1.0
## 1 2 Diana TX 2.0 1.0 2.0
## 2 3 Cathy IL 3.0 1.0 3.0
## 3 3 Cathy IL 4.0 2.0 1.0
## 4 4 Brandon AK 5.0 3.0 2.0
## 5 4 Brandon AK 6.0 3.0 1.0
## 6 5 Brandon CA NaN NaN NaN
## 7 6 Eric AZ 7.0 3.0 1.0
## 8 6 Eric AZ 7.0 1.0 4.0
In PySpark
result = customerDF.join(purchaseDF, (customerDF.CustomerId == purchaseDF.CustID), 'outer')
result.select('CustomerId', 'Name', 'State', 'PurchaseId', 'ProductId', 'Qty').orderBy('CustomerId').show()
In R
merge(product, purchase, by=NULL)
## ProductId.x Product Size Price PurchaseId CustomerId ProductId.y Qty
## 1 1 iPhone6 32 649 1 1 1 1
## 2 2 iPhone6 64 749 1 1 1 1
## 3 3 iWatch NA 399 1 1 1 1
## 4 4 MacBook 256 1299 1 1 1 1
## 5 1 iPhone6 32 649 2 2 1 2
## 6 2 iPhone6 64 749 2 2 1 2
## 7 3 iWatch NA 399 2 2 1 2
## 8 4 MacBook 256 1299 2 2 1 2
## 9 1 iPhone6 32 649 3 3 1 3
## 10 2 iPhone6 64 749 3 3 1 3
## 11 3 iWatch NA 399 3 3 1 3
## 12 4 MacBook 256 1299 3 3 1 3
## 13 1 iPhone6 32 649 4 3 2 1
## 14 2 iPhone6 64 749 4 3 2 1
## 15 3 iWatch NA 399 4 3 2 1
## 16 4 MacBook 256 1299 4 3 2 1
## 17 1 iPhone6 32 649 5 4 3 2
## 18 2 iPhone6 64 749 5 4 3 2
## 19 3 iWatch NA 399 5 4 3 2
## 20 4 MacBook 256 1299 5 4 3 2
## 21 1 iPhone6 32 649 6 4 3 1
## 22 2 iPhone6 64 749 6 4 3 1
## 23 3 iWatch NA 399 6 4 3 1
## 24 4 MacBook 256 1299 6 4 3 1
## 25 1 iPhone6 32 649 7 6 3 1
## 26 2 iPhone6 64 749 7 6 3 1
## 27 3 iWatch NA 399 7 6 3 1
## 28 4 MacBook 256 1299 7 6 3 1
## 29 1 iPhone6 32 649 7 6 1 4
## 30 2 iPhone6 64 749 7 6 1 4
## 31 3 iWatch NA 399 7 6 1 4
## 32 4 MacBook 256 1299 7 6 1 4
No cross join function in Python
In PySpark
spark.conf.set("spark.sql.crossJoin.enabled", True),
productDF.join(purchaseDF).show()
Advance SQL is make easy using dplyr with its %>% and simple “verbs”/functions to chain together multiple simple steps to achieve a complex result.
dplyr | SQL |
---|---|
select(df, column, column:column, …) | select |
select(df, col1=column, …) | select column as col1 |
slice(df, position:position) | select top position |
distinct(select(df, column, …) | select distinct column |
n_distinct(df, column) | select count(distinct column) |
count(df, column) | select count(column) |
mutate(df, col3 = col1 + col2, …) | select col1, col2, col1 + col2 as col3 |
transform(df, col3 = col1 + col2, col4 = col1 + col3, …) | select col1, col2, col1 + col2 as col3, col1 + col3 as col4 |
transmute(df, col3 = col1 + col2, col4 = col1 + col3, …) | select col1 + col2 as col3, col1 + col3 as col4 |
filter(df, expression operator expression, …) | where |
inner_join(df1, df2, by=col1) | select * from df1 join df2 where df1.col1 = df2.col1 |
left_join(df1, df2, by.x=“x”, by.y=“y”) | select * from df1 left join df2 where df1.x = df2.y |
right_join(df1, df2, by=c(“x”, “y”)) | select * from df1 right join df2 where df1.x = df2.y |
full_join(df1, df2) | select * from df1 outer join df2 |
semi_join(df1, df2) | select df1.* from df1 join df2 |
anti_join(df1, df2) | select df1.* from df1 left join df2 where df2 is null |
group_by(df, ) | group by |
arrange(df, column, desc(column), …) | order by |
summarise(df, avg=mean(col1), max=max(col2)) | select mean(col1) avg, max(col2) max |
summarise(n(), n_distinct(column)) | select count(*), count(distinct column) as d |
union(df1, df2) | union |
intersect(df1, df2) | intersect |
setdiff(df1, df2) | except |
sample_n(df, sample_size, replace=TRUE) | … |
sample_frac(df, fraction, replace=FALSE) | where RAND() < fraction |
Add a variable Cost which is Purchase.Qty * Product.Price for each purchased item.
In R
library(dplyr)
inner_join(product, purchase, by="ProductId") %>% transform(cost=Qty * Price)
## ProductId Product Size Price PurchaseId CustomerId Qty cost
## 1 1 iPhone6 32 649 1 1 1 649
## 2 1 iPhone6 32 649 2 2 2 1298
## 3 1 iPhone6 32 649 3 3 3 1947
## 4 1 iPhone6 32 649 7 6 4 2596
## 5 2 iPhone6 64 749 4 3 1 749
## 6 3 iWatch NA 399 5 4 2 798
## 7 3 iWatch NA 399 6 4 1 399
## 8 3 iWatch NA 399 7 6 1 399
In Python
joined = pd.merge(product, purchase, how='inner', on='ProductId')
joined['cost'] = joined['Qty'] * joined['Price']
print(joined)
## ProductId Product Size Price PurchaseId CustomerId Qty cost
## 0 1 iPhone6 32.0 649.0 1 1 1 649.0
## 1 1 iPhone6 32.0 649.0 2 2 2 1298.0
## 2 1 iPhone6 32.0 649.0 3 3 3 1947.0
## 3 1 iPhone6 32.0 649.0 7 6 4 2596.0
## 4 2 iPhone6 64.0 749.0 4 3 1 749.0
## 5 3 iWatch NaN 399.0 5 4 2 798.0
## 6 3 iWatch NaN 399.0 6 4 1 399.0
## 7 3 iWatch NaN 399.0 7 6 1 399.0
In PySpark
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
resultDF = productDF.join(purchaseDF, (productDF.ProductId == purchaseDF.ProductId), 'inner')
resultDF.withColumn('cost', resultDF.Qty * resultDF.Price).show()
Find the number of items and total due for each purchase order by Total_cost desc
In R
inner_join(product, purchase, by="ProductId") %>% group_by(PurchaseId) %>% summarise(Num_item=n(), Total_cost=sum(Qty*Price)) %>% arrange(desc(Total_cost))
## # A tibble: 7 x 3
## PurchaseId Num_item Total_cost
## <dbl> <int> <dbl>
## 1 7 2 2995
## 2 3 1 1947
## 3 2 1 1298
## 4 5 1 798
## 5 4 1 749
## 6 1 1 649
## 7 6 1 399
In Python
joined = pd.merge(product, purchase, how='inner', on='ProductId')
joined['total_cost'] = joined['Qty'] * joined['Price']
grouped = joined.groupby('PurchaseId').agg({'Size':'count', 'total_cost':'sum'}).reset_index()
print(grouped.sort_values(by='total_cost', ascending=0))
## PurchaseId Size total_cost
## 6 7 1 2995.0
## 2 3 1 1947.0
## 1 2 1 1298.0
## 4 5 0 798.0
## 3 4 1 749.0
## 0 1 1 649.0
## 5 6 0 399.0
In PySpark
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
joined = productDF.join(purchaseDF, (productDF.ProductId == purchaseDF.ProductId), 'inner')
joined = joined.withColumn('total_cost', joined.Qty * joined.Price)
from pyspark.sql import functions as F
grouped = joined.groupBy("PurchaseId").agg(F.count('Size').alias("Num_item"), F.sum("total_cost").alias("Total_cost"))
grouped.orderBy('Total_cost', ascending=0).show()
Find how many purchase made, number of unique product purchased and amount spent for each customer order by customer’s State and Name
In R
customer$Name <- as.character(customer$Name)
customer$State <- as.character(customer$State)
left_join(customer, purchase, by="CustomerId") %>% left_join(product, by="ProductId") %>% group_by(State, Name, CustomerId) %>% summarise(Num_purchase=n_distinct(PurchaseId), Num_product=sum(n_distinct(ProductId)), Total_amount=sum(Qty * Price)) %>% arrange(State)
## # A tibble: 6 x 6
## # Groups: State, Name [6]
## State Name CustomerId Num_purchase Num_product Total_amount
## <chr> <chr> <dbl> <int> <int> <dbl>
## 1 AK Brandon 4 2 1 1197
## 2 AZ Eric 6 1 2 2995
## 3 CA Angela 1 1 1 649
## 4 CA Brandon 5 1 1 NA
## 5 IL Cathy 3 2 2 2696
## 6 TX Diana 2 1 1 1298
In Python
joined = pd.merge(customer, purchase, how='left', on='CustomerId')
joined = pd.merge(joined, product, how='left', on='ProductId')
joined['total_cost'] = joined['Qty'] * joined['Price']
grouped = joined.groupby(['CustomerId', 'Name', 'State']).agg({'PurchaseId':'nunique', 'ProductId':'nunique', 'total_cost':'sum'}).reset_index()
grouped = grouped.rename(columns = {'PurchaseId': 'Num_purchase', 'ProductId': 'Num_product', 'total_cost':'Total_amount'})
print(grouped[['CustomerId', 'State', 'Name', 'Num_purchase', 'Num_product', 'Total_amount']].sort_values(by=['State', 'Name']))
## CustomerId State Name Num_purchase Num_product Total_amount
## 3 4 AK Brandon 2 1 1197.0
## 5 6 AZ Eric 1 2 2995.0
## 0 1 CA Angela 1 1 649.0
## 4 5 CA Brandon 0 0 0.0
## 2 3 IL Cathy 2 2 2696.0
## 1 2 TX Diana 1 1 1298.0
In PySpark
customerDF = spark.createDataFrame(customer)
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
purchaseDF = purchaseDF.withColumnRenamed("CustomerId", "CustId").withColumnRenamed("ProductId", "ProdId")
joined = customerDF.join(purchaseDF, (customerDF.CustomerId == purchaseDF.CustId), 'left')
joined = joined.join(productDF, (joined.ProdId == productDF.ProductId), 'left')
joined = joined.withColumn('total_cost', joined.Qty * joined.Price)
from pyspark.sql import functions as F
grouped = sc.parallelize(joined.groupBy(['CustomerId', 'Name', 'State']).agg(F.countDistinct('PurchaseId').alias("Num_purchase"), F.countDistinct('ProductId').alias("Num_product"), F.sum("total_cost").alias("Total_amount")).collect()).toDF()
grouped.orderBy(['State', 'Name']).show()
Find how many purchase made, number of unique product purchased and total amount spent for customer who spent more than $1000 order by Total_amount desc
In R
customer$State <- as.character(customer$State)
left_join(customer, purchase, by="CustomerId") %>% left_join(product, by="ProductId") %>% group_by(CustomerId) %>% summarise(State=min(State), Name=min(Name), Num_purchase=n_distinct(PurchaseId), Num_product=sum(n_distinct(ProductId)), Total_amount=sum(Qty * Price)) %>% filter(Total_amount > 1000) %>% arrange(desc(Total_amount))
## # A tibble: 4 x 6
## CustomerId State Name Num_purchase Num_product Total_amount
## <dbl> <chr> <chr> <int> <int> <dbl>
## 1 6 AZ Eric 1 2 2995
## 2 3 IL Cathy 2 2 2696
## 3 2 TX Diana 1 1 1298
## 4 4 AK Brandon 2 1 1197
In Python
joined = pd.merge(customer, purchase, how='left', on='CustomerId')
joined = pd.merge(joined, product, how='left', on='ProductId')
joined['total_cost'] = joined['Qty'] * joined['Price']
grouped = joined.groupby(['CustomerId', 'Name', 'State']).agg({'PurchaseId':'nunique', 'ProductId':'nunique', 'total_cost':'sum'}).reset_index()
grouped = grouped.rename(columns = {'PurchaseId': 'Num_purchase', 'ProductId': 'Num_product', 'total_cost':'Total_amount'})
grouped = grouped[grouped['Total_amount'] > 1000]
print(grouped[['CustomerId', 'State', 'Name', 'Num_purchase', 'Num_product', 'Total_amount']].sort_values(by=['Total_amount'], ascending=0))
## CustomerId State Name Num_purchase Num_product Total_amount
## 5 6 AZ Eric 1 2 2995.0
## 2 3 IL Cathy 2 2 2696.0
## 1 2 TX Diana 1 1 1298.0
## 3 4 AK Brandon 2 1 1197.0
In PySpark
customerDF = spark.createDataFrame(customer)
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
purchaseDF = purchaseDF.withColumnRenamed("CustomerId", "CustId").withColumnRenamed("ProductId", "ProdId")
joined = customerDF.join(purchaseDF, (customerDF.CustomerId == purchaseDF.CustId), 'left')
joined = joined.join(productDF, (joined.ProdId == productDF.ProductId), 'left')
joined = joined.withColumn('total_cost', joined.Qty * joined.Price)
from pyspark.sql import functions as F
grouped = joined.groupBy(['CustomerId', 'Name', 'State']).agg(F.countDistinct('PurchaseId').alias("Num_purchase"), F.countDistinct('ProductId').alias("Num_product"), F.sum("total_cost").alias("Total_amount"))
grouped.filter(grouped.Total_amount > 1000).orderBy('Total_amount', ascending=0).show()
For customers purhcased iPhone6 find how many purchase made, number of unique product purchased and total amount spent for customer order total quantity desc
In R
filter(product, Product=="iPhone6") %>% inner_join(purchase, by="ProductId", "CustomerId") %>% select(CustomerId) %>% distinct(CustomerId) %>% inner_join(purchase, by="CustomerId") %>% inner_join(product, by="ProductId") %>% group_by(CustomerId) %>% summarise(Num_purchase=n_distinct(PurchaseId), Num_product=sum(n_distinct(ProductId)), Total_amount=sum(Qty * Price), Total_Qty=sum(Qty)) %>% arrange(desc(Total_Qty))
## # A tibble: 4 x 5
## CustomerId Num_purchase Num_product Total_amount Total_Qty
## <dbl> <int> <int> <dbl> <dbl>
## 1 6 1 2 2995 5
## 2 3 2 2 2696 4
## 3 2 1 1 1298 2
## 4 1 1 1 649 1
In Python
iPhone6_Id = product[product['Product'] == 'iPhone6']
joined = pd.merge(purchase, iPhone6_Id, how='inner', on='ProductId')
iPhone6_CustID = pd.DataFrame(joined.CustomerId.unique(), columns=['CustomerId'])
customer['CustomerId'] = customer['CustomerId'].astype(int)
iPhone6_Cust_Purchase = pd.merge(purchase, iPhone6_CustID, how='inner', on='CustomerId')
iPhone6_Cust_Product = pd.merge(iPhone6_Cust_Purchase, product, how='inner', on='ProductId')
iPhone6_Cust_Product['total_cost'] = iPhone6_Cust_Product['Qty'] * iPhone6_Cust_Product['Price']
grouped = iPhone6_Cust_Product.groupby(['CustomerId']).agg({'PurchaseId':'nunique', 'ProductId':'nunique', 'Qty':'sum', 'total_cost':'sum'}).reset_index()
grouped = grouped.rename(columns = {'PurchaseId': 'Num_purchase', 'ProductId': 'Num_product', 'Qty': 'Total_Qty', 'total_cost':'Total_amount'})
print(grouped[['CustomerId', 'Num_purchase', 'Num_product', 'Total_Qty', 'Total_amount']].sort_values(by=['Total_Qty'], ascending=0))
## CustomerId Num_purchase Num_product Total_Qty Total_amount
## 3 6 1 2 5 2995.0
## 2 3 2 2 4 2696.0
## 1 2 1 1 2 1298.0
## 0 1 1 1 1 649.0
In PySpark
customerDF = spark.createDataFrame(customer)
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
purchaseDF = purchaseDF.withColumnRenamed("ProductId", "ProdId")
iphone6 = sc.parallelize(productDF.filter(productDF.Product == 'iPhone6').collect()).toDF()
iphone6_custID = purchaseDF.join(iphone6, (purchaseDF.ProdId == iphone6.ProductId), 'inner').select('CustomerId').distinct().withColumnRenamed("CustomerId", "CustId")
iphone6_cust_purchase = purchaseDF.join(iphone6_custID, (purchaseDF.CustomerId == iphone6_custID.CustId), 'inner')
iPhone6_Cust_Product = productDF.join(iphone6_cust_purchase, (productDF.ProductId == iphone6_cust_purchase.ProdId), 'inner')
iPhone6_Cust_Product = iPhone6_Cust_Product.withColumn('total_cost', iPhone6_Cust_Product.Qty * iPhone6_Cust_Product.Price)
from pyspark.sql import functions as F
grouped = iPhone6_Cust_Product.groupBy(['CustomerId']).agg(F.countDistinct('PurchaseId').alias("Num_purchase"), F.countDistinct('ProductId').alias("Num_product"), F.sum('Qty').alias("Total_Qty"), F.sum("total_cost").alias("Total_amount"))
grouped.orderBy('Total_Qty', ascending=0).show()
For each customers list their purchase (PurchaseId level) and associated cummulative revenue order by customer state, name
In R
inner_join(purchase, product, by="ProductId") %>% transform(total=Qty*Price) %>% group_by(PurchaseId, CustomerId) %>% summarise(rev=sum(total)) %>% group_by(CustomerId) %>% mutate(cumrev=cumsum(rev)) %>% ungroup() %>% inner_join(customer) %>% arrange(State, Name, PurchaseId)
## # A tibble: 7 x 6
## PurchaseId CustomerId rev cumrev Name State
## <dbl> <dbl> <dbl> <dbl> <chr> <chr>
## 1 5 4 798 798 Brandon AK
## 2 6 4 399 1197 Brandon AK
## 3 7 6 2995 2995 Eric AZ
## 4 1 1 649 649 Angela CA
## 5 3 3 1947 1947 Cathy IL
## 6 4 3 749 2696 Cathy IL
## 7 2 2 1298 1298 Diana TX
In Python
joined = pd.merge(product, purchase, how='inner', on='ProductId')
joined['total_cost'] = joined['Qty'] * joined['Price']
grouped = joined.groupby(['CustomerId', 'PurchaseId']).agg({'total_cost':'sum'})#.reset_index()
grouped['cumrev'] = grouped.groupby(level=0)['total_cost'].apply(lambda x: x.cumsum())
grouped['pct'] = grouped.groupby(level=['CustomerId'])['total_cost'].apply(lambda x: 100 * x / float(x.sum()))
grouped = grouped.reset_index()
grouped.columns = ['CustomerId', 'PurchaseId', 'rev', 'cumrev', '%_cust_rev']
result = pd.merge(grouped, customer, how='inner', on='CustomerId')
print(result.sort_values(by=['State', 'Name', 'PurchaseId']))
## CustomerId PurchaseId rev cumrev %_cust_rev Name State
## 4 4 5 798.0 798.0 66.666667 Brandon AK
## 5 4 6 399.0 1197.0 33.333333 Brandon AK
## 6 6 7 2995.0 2995.0 100.000000 Eric AZ
## 0 1 1 649.0 649.0 100.000000 Angela CA
## 2 3 3 1947.0 1947.0 72.218101 Cathy IL
## 3 3 4 749.0 2696.0 27.781899 Cathy IL
## 1 2 2 1298.0 1298.0 100.000000 Diana TX
In PySpark
customerDF = spark.createDataFrame(customer)
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
purchaseDF = purchaseDF.withColumnRenamed("ProductId", "ProdId").withColumnRenamed("CustomerId", "CustId")
joined = productDF.join(purchaseDF, productDF.ProductId == purchaseDF.ProdId, 'inner')
joined = joined.withColumn('total_cost', joined.Qty * joined.Price)
grouped = joined.groupBy(['PurchaseId', 'CustId']).agg(F.sum('total_cost').alias("Rev"))
from pyspark.sql.window import Window
import pyspark.sql.functions as F
myPartition = Window.partitionBy(grouped['CustId']).orderBy(grouped['PurchaseId'])
grouped_cum = grouped.withColumn('CumRev', F.sum(grouped.Rev).over(myPartition)).orderBy(['CustId', 'PurchaseId'])
cust_cum = grouped_cum.join(customerDF, (grouped_cum.CustId == customerDF.CustomerId), 'inner')
cust_cum.orderBy(['State', 'Name', 'PurchaseId']).show()
Number of iPhone sold, % of which are 32GB and 64GB
In R
inner_join(purchase, product, by="ProductId") %>% filter(Product=="iPhone6") %>% summarise(iPhone6_sold=sum(Qty), GB32=round(sum(Qty[Size==32])/sum(Qty)*100,2), GB64=round(sum(Qty[Size==64]/sum(Qty))*100,2))
## iPhone6_sold GB32 GB64
## 1 11 90.91 9.09
In Python
iPhone6_Id = product[product['Product'].isin(['iPhone6'])]
joined = pd.merge(purchase, iPhone6_Id, how='inner', on='ProductId')
print(joined.groupby('Product').apply(lambda x: pd.Series(dict(sold=x.Qty.sum(), GB32=x[x['Size'] == 32].Qty.sum()/float(x.Qty.sum())*100, GB64=x[x['Size'] == 64].Qty.sum()/float(x.Qty.sum())*100))))
## sold GB32 GB64
## Product
## iPhone6 11.0 90.909091 9.090909
or
iPhone6_Id = product[product['Product'].isin(['iPhone6'])]
joined = pd.merge(purchase, iPhone6_Id, how='inner', on='ProductId')
print("iPhone6_sold:", (joined.groupby(['Product']).Qty.sum())[0])
print("iPhone6_sold by size in percentage:")
joined.groupby(['Product', 'Size']).Qty.sum() / joined.groupby(['Product']).Qty.sum() * 100
## iPhone6_sold: 11
## iPhone6_sold by size in percentage:
## Product Size
## iPhone6 32.0 90.909091
## 64.0 9.090909
## Name: Qty, dtype: float64
In PySpark
productDF = spark.createDataFrame(product)
purchaseDF = spark.createDataFrame(purchase)
purchaseDF = purchaseDF.withColumnRenamed("ProductId", "ProdId").withColumnRenamed("CustomerId", "CustId")
iphone6 = productDF.filter(productDF.Product.isin(['iPhone6']))
joined = iphone6.join(purchaseDF, iphone6.ProductId == purchaseDF.ProdId, 'inner')
import pyspark.sql.functions as F
grouped = joined.groupby('Product').agg(F.sum(joined['Qty']).alias('sold'), (F.sum(F.when(joined['Size']==32, joined['Qty']).otherwise(0))/F.sum(joined['Qty'])*100).alias('GB32'), (F.sum(F.when(joined['Size']==64, joined['Qty']).otherwise(0))/F.sum(joined['Qty'])*100).alias('GB64'))
grouped.show()