Python多进程处理:如何将大量数据放入有限内存
admin
2023-07-31 01:42:49
0

简介

这是一篇有关如何将大量的数据放入有限的内存中的简略教程。

与客户工作时,有时会发现他们的数据库实际上只是一个csv或Excel文件仓库,你只能将就着用,经常需要在不更新他们的数据仓库的情况下完成工作。大部分情况下,如果将这些文件存储在一个简单的数据库框架中或许更好,但时间可能不允许。这种方法对时间、机器硬件和所处环境都有要求。

下面介绍一个很好的例子:假设有一堆表格(没有使用Neo4j、MongoDB或其他类型的数据库,仅仅使用csvs、tsvs等格式存储的表格),如果将所有表格组合在一起,得到的数据帧太大,无法放入内存。所以第一个想法是:将其拆分成不同的部分,逐个存储。这个方案看起来不错,但处理起来很慢。除非我们使用多核处理器。

目标

这里的目标是从所有职位中(大约1万个),找出相关的的职位。将这些职位与政府给的职位代码组合起来。接着将组合的结果与对应的州(行政单位)信息组合起来。然后用通过word2vec生成的属性信息在我们的客户的管道中增强已有的属性。

这个任务要求在短时间内完成,谁也不愿意等待。想象一下,这就像在不使用标准的关系型数据库的情况下进行多个表的连接。

数据

职位数据

referencenumber title postdate url company city state description
1652398203 Sales Associate 2014-07-09 13:47:18 URL link Company Name City State Our Sales Associates are…

“表格太长,请到原文查看。”

标题数据

ID Title
82 Pediatricians, General

OES数据

area area_title area_type naics naics_title own_code 后略…
99 U.S. 1 000000 Cross-industry 1235 00-0000

“表格太长,请到原文查看。”

SOC表

2010 SOC Code 2010 SOC Title 2010 SOC Direct Match Title llustrative Example
11-1011 Chief Executives CEO

示例脚本

下面的是一个示例脚本,展示了如何使用multiprocessing来在有限的内存空间中加速操作过程。脚本的第一部分是和特定任务相关的,可以自由跳过。请着重关注第二部分,这里侧重的是multiprocessing引擎。

1234567891011121314151617181920212223242526272829303132333435363738394041424344 #import the necessary packagesimport pandas as pdimport usimport numpy as npfrom multiprocessing import Pool,cpu_count,Queue,Manager # the data in one particular column was number in the form that horrible excel version # of a number where \’12000\’ is \’12,000\’ with that beautiful useless comma in there. # did I mention I excel bothers me?# instead of converting the number right away, we only convert them when we need todef median_maker(column):    return np.median([int(x.replace(\’,\’,\’\’)) for x in column]) # dictionary_of_dataframes contains a dataframe with information for each title; e.g title is \’Data Scientist\’# related_title_score_df is the dataframe of information for the title; columns = [\’title\’,\’score\’] ### where title is a similar_title and score is how closely the two are related, e.g. \’Data Analyst\’, 0.871# code_title_df contains columns [\’code\’,\’title\’]# oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!) def job_title_location_matcher(title,location):    try:        related_title_score_df = dictionary_of_dataframes[title]        # we limit dataframe1 to only those related_titles that are above         # a previously established threshold        related_title_score_df = related_title_score_df[title_score_df[\’score\’]>80]         #we merge the related titles with another table and its codes        codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df)        codes_relTitles_scores = codes_relTitles_scores.drop_duplicates()         # merge the two dataframes by the codes        merged_df = pd.merge(codes_relTitles_scores, oes_data_df)        #limit the BLS data to the state we want        all_merged = merged_df[merged_df[\’area_title\’]==str(us.states.lookup(location).name)]         #calculate some summary statistics for the time we want        group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[[\’tot_emp\’,\’a_mean\’,\’a_pct10\’,\’a_pct25\’,\’a_median\’,\’a_pct75\’,\’a_pct90\’]].apply(median_maker)        row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90]        #convert it all to strings so we can combine them all when writing to file        row_string = [str(x) for x in row]        return row_string    except:        # if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant        \’do nothing\’

这里发生了神奇的事情:

1234567891011121314151617181920212223242526272829303132333435363738394041 #runs the function and puts the answers in the queuedef worker(row, q):        ans = job_title_location_matcher(row[0],row[1])        q.put(ans) # this writes to the file while there are still things that could be in the queue# this allows for multiple processes to write to the same file without blocking eachotherdef listener(q):    f = open(filename,\’wb\’)    while 1:        m = q.get()        if m ==\’kill\’:                break        f.write(\’,\’.join(m) + \’n\’)        f.flush()    f.close() def main():    #load all your data, then throw out all unnecessary tables/columns    filename = \’skill_TEST_POOL.txt\’     #sets up the necessary multiprocessing tasks     manager = Manager()    q = manager.Queue()    pool = Pool(cpu_count() + 2)    watcher = pool.map_async(listener,(q,))     jobs = []    #titles_states is a dataframe of millions of job titles and states they were found in    for i in titles_states.iloc:        job = pool.map_async(worker, (i, q))        jobs.append(job)     for job in jobs:        job.get()    q.put(\’kill\’)    pool.close()    pool.join() if __name__ == \”__main__\”:    main()

由于每个数据帧的大小都不同(总共约100Gb),所以将所有数据都放入内存是不可能的。通过将最终的数据帧逐行写入内存,但从来不在内存中存储完整的数据帧。我们可以完成所有的计算和组合任务。这里的“标准方法”是,我们可以仅仅在“job_title_location_matcher”的末尾编写一个“write_line”方法,但这样每次只会处理一个实例。根据我们需要处理的职位/州的数量,这大概需要2天的时间。而通过multiprocessing,只需2个小时。

虽然读者可能接触不到本教程处理的任务环境,但通过multiprocessing,可以突破许多计算机硬件的限制。本例的工作环境是c3.8xl ubuntu ec2,硬件为32核60Gb内存(虽然这个内存很大,但还是无法一次性放入所有数据)。这里的关键之处是我们在60Gb的内存的机器上有效的处理了约100Gb的数据,同时速度提升了约25倍。通过multiprocessing在多核机器上自动处理大规模的进程,可以有效提高机器的利用率。也许有些读者已经知道了这个方法,但对于其他人,可以通过multiprocessing能带来非常大的收益。顺便说一句,这部分是skill assets in the job-market这篇博文的延续。

相关内容

热门资讯

Mobi、epub格式电子书如... 在wps里全局设置里有一个文件关联,打开,勾选电子书文件选项就可以了。
500 行 Python 代码... 语法分析器描述了一个句子的语法结构,用来帮助其他的应用进行推理。自然语言引入了很多意外的歧义,以我们...
定时清理删除C:\Progra... C:\Program Files (x86)下面很多scoped_dir开头的文件夹 写个批处理 定...
scoped_dir32_70... 一台虚拟机C盘总是莫名奇妙的空间用完,导致很多软件没法再运行。经过仔细检查发现是C:\Program...
65536是2的几次方 计算2... 65536是2的16次方:65536=2⁶ 65536是256的2次方:65536=256 6553...
小程序支付时提示:appid和... [Q]小程序支付时提示:appid和mch_id不匹配 [A]小程序和微信支付没有进行关联,访问“小...
pycparser 是一个用... `pycparser` 是一个用 Python 编写的 C 语言解析器。它可以用来解析 C 代码并构...
微信小程序使用slider实现... 众所周知哈,微信小程序里面的音频播放是没有进度条的,但最近有个项目呢,客户要求音频要有进度条控制,所...
Apache Doris 2.... 亲爱的社区小伙伴们,我们很高兴地向大家宣布,Apache Doris 2.0.0 版本已于...
python清除字符串里非数字... 本文实例讲述了python清除字符串里非数字字符的方法。分享给大家供大家参考。具体如下: impor...