案例 | 通过PySpark实现城市热点大数据统计



1 需求分析


在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。我们根据每个用户的IP地址,与我们的IP地址段进行比较,确认每个IP落在哪一个IP端内,获取经纬度,然后绘制热力图。

因此,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。




2 技术调研


因为我们的需求是完成一张报表信息,所以对程序的实时性没有要求,所以可以选择内存计算spark来实现上述功能。




3 数据准备


ip日志信息

城市ip段信息




4 代码开发


整体步骤:

1、加载城市ip段信息,获取ip起始数字和结束数字,经度,维度

2、加载日志数据,获取ip信息,然后转换为数字,和ip段比较

3、比较的时候采用二分法查找,找到对应的经度和维度

4、然后对经度和维度做单词计数

完整代码:

整体步骤

#-*- coding: utf-8 -*-
from pyspark.sql import SparkSession
# 255.255.255.255 0~255 256  2^8 8位2进制数 32位2进制数
#将ip转换为特殊的数字形式  223.243.0.0|223.243.191.255|  255 2^8
#‭11011111‬
#00000000
#1101111100000000
#‭        11110011‬
#11011111111100110000000000000000
def ip_transform(ip):
    ips = ip.split(".")#[223,243,0,0] 32位二进制数
    ip_num = 0
    for i in ips:
        ip_num = int(i) | ip_num << 8
    return ip_num

#二分法查找ip对应的行的索引
def binary_search(ip_num, broadcast_value):
    start = 0
    end = len(broadcast_value) - 1
    while (start <= end):
        mid = int((start + end) / 2)
        if ip_num >= int(broadcast_value[mid][0]) and ip_num <= int(broadcast_value[mid][1]):
            return mid
        if ip_num < int(broadcast_value[mid][0]):
            end = mid
        if ip_num > int(broadcast_value[mid][1]):
            start = mid

def main():
    spark = SparkSession.builder.appName("test").getOrCreate()
    sc = spark.sparkContext
    city_id_rdd = sc.textFile("file:///export/pyfolder1/pyspark-chapter02_3.8/data/ip/ip.txt").map(lambda x:x.split("|")).map(lambda x: (x[2], x[3], x[13], x[14]))
    #创建一个广播变量
    city_broadcast = sc.broadcast(city_id_rdd.collect())
    dest_data = sc.textFile("file:///export/pyfolder1/pyspark-chapter02_3.8/data/ip/20190121000132394251.http.format").map(
        lambda x: x.split("|")[1])
    #根据取出对应的位置信息
    def get_pos(x):
        city_broadcast_value = city_broadcast.value
        #根据单个ip获取对应经纬度信息
        def get_result(ip):
            ip_num = ip_transform(ip)
            index = binary_search(ip_num, city_broadcast_value)
            #((纬度,精度),1)
            return ((city_broadcast_value[index][2], city_broadcast_value[index][3]), 1)

        x = map(tuple,[get_result(ip) for ip in x])
        return x

    dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((纬度,精度),1)
    result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b)
    print(result_rdd.collect())
    sc.stop()

if __name__ == '__main__':
    main()
    #[(('107.08166', '29.85359'), 29), (('108.948024', '34.263161'), 1824), (('114.502461', '38.045474'), 383), (('106.51107', '29.50197'), 91), (('102.712251', '25.040609'), 126), (('106.57434', '29.60658'), 177), (('116.405285', '39.904989'), 1535), (('107.7601', '29.32548'), 85), (('106.504962', '29.533155'), 400), (('106.27633', '29.97227'), 36), (('107.39007', '29.70292'), 47), (('106.56347', '29.52311'), 3)]




5 广播变量的使用


要统计Ip所对应的经纬度, 每一条数据都会去查询ip表

每一个task 都需要这一个ip表, 默认情况下, 所有task都会去复制ip表

实际上 每一个Worker上会有多个task, 数据也是只需要进行查询操作的, 所以这份数据可以共享,没必要每个task复制一份

可以通过广播变量, 通知当前worker上所有的task, 来共享这个数据,避免数据的多次复制,可以大大降低内存的开销

sparkContext.broadcast(要共享的数据)

转自:Ingemar

相关推荐

  • Webflow:40亿美金估值的无代码建站SaaS,能否跨越下个技术周期?
  • 从 0 实现 use-context-selector
  • 谷歌ChatGPT演砸了
  • 九种跨域方式实现原理
  • API网关策略的二三事
  • Android 14开发者预览版发布,支持应用双开、多用户登录
  • 中国开源社区健康案例——龙蜥社区
  • 前沿开源技术领域解读——开源AI
  • 全球开源技术峰会,优质议题征集
  • 第一位计算机科学博士诞生 | 历史上的今天
  • 工程师“魔改” AirPods Pro 接口,苹果“妥协”将成大势所趋?
  • 尴尬!谷歌版 ChatGPT 全球首秀“大翻车”,市值狂跌 7000 亿
  • 全球爆红的 ChatGPT 是如何诞生的?| 《架构师》二月刊开放下载
  • Tech Lead如何成为团队协作间的“润滑剂”?| ArchSummit
  • 男子刚领证7天被妻子打进医院;韩国申报的文化遗产来自日本;男子花35万找人跟妻子结婚;抖音即将上线外卖服务......|酷玩日爆
  • CTO 说了,用错 @Autowired 和 @Resource 的人可以领盒饭了
  • Hinton努力推翻自己积累了30年的学术成果,我才知道什么叫生命力!
  • 广西人怎么背着我们建了「疯狂动物城」
  • 2023,音视频技术将如何发展?| Q推荐
  • Node.js应用全链路追踪技术——全链路信息存储