在具体的分析或者特征工程之中,经常会遇到处理时间很久的问题,当然必要的优化是必须的。但是显然,数据量上升,计算量过大后,处理时间是必须的此。时,如果有个可以帮助您查看任务进度的进度条,必定可以提高你抓住处理时间去做(磨)别(洋)事(工)。当然逐行打印是不错的选择,但在Jupyter notebook/JupyterLab中,这种实践最大的问题是,打印过多,影响整个notebook的美观程度。

在此探讨的是5GB级别以下的数据(之上的Spark分析,有基于Zipplin的分布式任务精度条),主要环境是Jupyter下基于pandas包的分析和特征工程任务。

一个极为简单的例子

tqdm是基于Python的精度条模块,里面提供了简单的代码行进度条和基于ipywidgets的notebook内的进度条。由于现在tqdm相关模块还在开发阶段,可能会用到一些私有对象,之后正式版中可能具体API会有所变化。

当然,首先我们得载入模块,在notebook中使用tqdm带的基于Js显示的进度条前,请务必检查是否安装ipywidgets模块。

from tqdm import tqdm_notebook, _tqdm_notebook
_tqdm_notebook.tqdm_notebook.pandas()

其中第一行载入的两个方法的作用分别是:

  • tqdm_notebook:用来包装任何可以iterable的对象,在使用其元素进行运算结束后统计时间。
  • _tqdm_notebook:其中含有模块可以处理pandas的对象。

第二行则是重载pandas里面的对象,提供可以展示精度条的方法。

下面我们可以尝试直接使用tqdm_notebook包裹iterable对象来展示进度条,效果如下:

a = list(range(1, 10000))
b = range(1, 10000)
_ = [(lambda x: x+1)(i) for i in tqdm_notebook(a)]
_ = [(lambda x: x+1)(i) for i in tqdm_notebook(b)]

image-20180818130642527

当然如果仅仅是使用range也可以使用tqdm自带的tnrange

from tqdm._tqdm_notebook import tnrange
_ = [(lambda x: x+1)(i) for i in tnrange(1, 10000)]

效果如下:

image-20180818131834412

命名和深度

在一些场合,可能写需要多个层级的迭代,此时,我们可以通过命名每个层级的迭代器来实现这个个效果。使用desc参数即可:

for i in tnrange(1, 10, desc='i Loop'):
    for j in tnrange(1, 10000, desc='j Loop'):
        i+j

image-20180818132338185

当然,如果遇到Loop过多时,可能会依旧出现打印过多的困扰。此时leave参数是一个不错的推荐。

for i in tqdm_notebook(range(100), desc='i-Loop'):
    for j in tqdm_notebook(range(10000), desc='j-Loop', leave=False):
        i+j

image-20180818132953242

多进程的扩展

当然,在具体计算中,多进程往往是经常会需要的一类扩展(由于Python只能基于一个运算核心进行计算的限制),这时候线程的运算也是经常需要考量的方式。

在使用过程中,第一个需要注意的问题是,tqdm每次是在从iterable对象中取值时,进行更新,而如果在map之前的list中做进度条的包裹,是在未使用map的函数之前统计。所以在进度条完成时,可能还会有一段时间后才真的执行结束。

from multiprocessing import Pool
def f(x):
    return x**32
p = Pool(5)
_ = [i for i in p.imap(f, tnrange(1000))]

而一个更好的处理是在使用后标记时间,使用multiprocessing.Pool.imap作为迭代对象,但这个问题是tqdm无法识别具体数量,此时,指定tqdm的迭代次数total即可。

_ = [i+1 for i in tqdm_notebook(p.imap(f, range(1000)))]

image-20180818140646377

_ = [i for i in tqdm_notebook(p.imap(f, range(3, 1000)), total=997)]

image-20180818141522829

pandas中使用

pandas中的使用,也是非常简单,在重载命令执行后,SeriresDataFrameGroupBy对象都会拥有progress_apply方法,用法和apply一致,直接可以调取进度条。

image-20180818143609668

实战:复杂场景中的使用

最后,我们结合一下之前的多线程和pandas操作,处理更大规模的数据。基本思路是,将DataFrame拆成若干组分,最后通过pandas.concat合并起结果。

def parallelize_dataframe(df, func, n_jobs=3, split_num=10):
    ## 拆分数据表
    df_split = np.array_split(df, split_num)
    pool = Pool(n_jobs)
    df_list = []
    
    ## map操作
    for df_element in tqdm_notebook(pool.imap(func, df_split), total=10000):
        df_list.append(df_element)
       
    ## reduce操作
    df = pd.concat(df_list)
    
    ## 关闭进程
    pool.close()
    pool.join()
    return df

以上实现了基本的基于tqdm显示处理进度的操作。使用方法如下:

def apply_add_1(df):
    return df.apply(lambda row: row['sepal_length']+1, axis=1)
_ = parallelize_dataframe(iris_df, apply_add_1)

image-20180818145910290

结语

查看了一下进度条,这次预处理我还要花一小时,可以先去冲杯咖啡了。