python

超轻量级php框架startmvc

DataFrame:通过SparkSql将scala类转为DataFrame的方法

更新时间:2020-06-23 14:24:01 作者:startmvc
如下所示:importjava.text.DecimalFormatimportcom.alibaba.fastjson.JSONimportcom.donews.data.AppConfigimportcom.type

如下所示:


import java.text.DecimalFormat
import com.alibaba.fastjson.JSON
import com.donews.data.AppConfig
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
 
/**
 * Created by silentwolf on 2016/6/3.
 */
 
case class UserTag(SUUID: String,
 MAN: Float,
 WOMAN: Float,
 AGE10_19: Float,
 AGE20_29: Float,
 AGE30_39: Float,
 AGE40_49: Float,
 AGE50_59: Float,
 GAME: Float,
 MOVIE: Float,
 MUSIC: Float,
 ART: Float,
 POLITICS_NEWS: Float,
 FINANCIAL: Float,
 EDUCATION_TRAINING: Float,
 HEALTH_CARE: Float,
 TRAVEL: Float,
 AUTOMOBILE: Float,
 HOUSE_PROPERTY: Float,
 CLOTHING_ACCESSORIES: Float,
 BEAUTY: Float,
 IT: Float,
 BABY_PRODUCT: Float,
 FOOD_SERVICE: Float,
 HOME_FURNISHING: Float,
 SPORTS: Float,
 OUTDOOR_ACTIVITIES: Float,
 MEDICINE: Float
 )
 
object UserTagTable {
 
 val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass)
 
 val REP_HOME = s"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}"
 
 def main(args: Array[String]) {
 
 var startTime = System.currentTimeMillis()
 
 val conf: com.typesafe.config.Config = ConfigFactory.load()
 
 val sc = new SparkContext()
 
 val sqlContext = new SQLContext(sc)
 
 var df1: DataFrame = null
 
 if (args.length == 0) {
 println("请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11")
 }
 else {
 
 var appkey = args(0)
 
 var lastdate = args(1)
 
 df1 = loadDataFrame(sqlContext, appkey, "2016-04-10", lastdate)
 
 df1.registerTempTable("suuidTable")
 
 sqlContext.udf.register("taginfo", (a: String) => userTagInfo(a))
 sqlContext.udf.register("intToString", (b: Long) => intToString(b))
 import sqlContext.implicits._
 
 //***重点***:将临时表中的suuid和自定函数中Json数据,放入UserTag中。
 sqlContext.sql(" select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid").map { case Row(suuid: String, taginfo: String) =>
 val taginfoObj = JSON.parseObject(taginfo)
 UserTag(suuid.toString,
 taginfoObj.getFloat("man"),
 taginfoObj.getFloat("woman"),
 taginfoObj.getFloat("age10_19"),
 taginfoObj.getFloat("age20_29"),
 taginfoObj.getFloat("age30_39"),
 taginfoObj.getFloat("age40_49"),
 taginfoObj.getFloat("age50_59"),
 taginfoObj.getFloat("game"),
 taginfoObj.getFloat("movie"),
 taginfoObj.getFloat("music"),
 taginfoObj.getFloat("art"),
 taginfoObj.getFloat("politics_news"),
 taginfoObj.getFloat("financial"),
 taginfoObj.getFloat("education_training"),
 taginfoObj.getFloat("health_care"),
 taginfoObj.getFloat("travel"),
 taginfoObj.getFloat("automobile"),
 taginfoObj.getFloat("house_property"),
 taginfoObj.getFloat("clothing_accessories"),
 taginfoObj.getFloat("beauty"),
 taginfoObj.getFloat("IT"),
 taginfoObj.getFloat("baby_Product"),
 taginfoObj.getFloat("food_service"),
 taginfoObj.getFloat("home_furnishing"),
 taginfoObj.getFloat("sports"),
 taginfoObj.getFloat("outdoor_activities"),
 taginfoObj.getFloat("medicine")
 )}.toDF().registerTempTable("resultTable")
 
 val resultDF = sqlContext.sql(s"select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," +
 "AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," +
 "HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," +
 "MEDICINE from resultTable WHERE SUUID IS NOT NULL")
 resultDF.write.mode(SaveMode.Overwrite).options(
 Map("table" -> "USER_TAGS", "zkUrl" -> conf.getString("Hbase.url"))
 ).format("org.apache.phoenix.spark").save()
 
 }
 }
 
 def intToString(suuid: Long): String = {
 suuid.toString()
 }
 
 def userTagInfo(num1: String): String = {
 
 var de = new DecimalFormat("0.00")
 var mannum = de.format(math.random).toFloat
 var man = mannum
 var woman = de.format(1 - mannum).toFloat
 
 var age10_19num = de.format(math.random * 0.2).toFloat
 var age20_29num = de.format(math.random * 0.2).toFloat
 var age30_39num = de.format(math.random * 0.2).toFloat
 var age40_49num = de.format(math.random * 0.2).toFloat
 
 var age10_19 = age10_19num
 var age20_29 = age20_29num
 var age30_39 = age30_39num
 var age40_49 = age40_49num
 var age50_59 = de.format(1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat
 
 var game = de.format(math.random * 1).toFloat
 var movie = de.format(math.random * 1).toFloat
 var music = de.format(math.random * 1).toFloat
 var art = de.format(math.random * 1).toFloat
 var politics_news = de.format(math.random * 1).toFloat
 
 var financial = de.format(math.random * 1).toFloat
 var education_training = de.format(math.random * 1).toFloat
 var health_care = de.format(math.random * 1).toFloat
 var travel = de.format(math.random * 1).toFloat
 var automobile = de.format(math.random * 1).toFloat
 
 var house_property = de.format(math.random * 1).toFloat
 var clothing_accessories = de.format(math.random * 1).toFloat
 var beauty = de.format(math.random * 1).toFloat
 var IT = de.format(math.random * 1).toFloat
 var baby_Product = de.format(math.random * 1).toFloat
 
 var food_service = de.format(math.random * 1).toFloat
 var home_furnishing = de.format(math.random * 1).toFloat
 var sports = de.format(math.random * 1).toFloat
 var outdoor_activities = de.format(math.random * 1).toFloat
 var medicine = de.format(math.random * 1).toFloat
 
 "{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," +
 "\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," +
 "\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," +
 "\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," +
 "\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," +
 "\"beauty\"" + ":" + beauty + "," + "\"IT\"" + ":" + IT + "," + "\"baby_Product\"" + ":" + baby_Product + "," + "\"food_service\"" + ":" + food_service + "," +
 "\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine +
 "}";
 
 }
 
 def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = {
 val path = s"$REP_HOME/appstatistic"
 ctx.read.parquet(path)
 .filter(s"timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'")
 }
 
 
}

以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

DataFrame SparkSql scala