from argparse import ArgumentParser # from psycopg2 import sql import gc from util import getQuery, datevalid import pandas as p import gc from datetime import datetime from tqdm import tqdm, trange from pprint import pprint from tempfile import TemporaryDirectory import numpy as np tables = [ 'public.best_icp', # All icps with at least 360 days of data in 2017 'public.best_icp_1618', # All icps with at least 720 days of data in 2 years from 1 April 2016 'public.best_icp_18m', # All icps with at least 540 days of data from July 2016 to end of 2017 'public.icp_sample', # A pre-generated 1k sample from best_icp 'public.icp_sample_5k', # A pre-generated 5k sample from best_icp 'public.icp_sample_1618', # A pre-generated 1k sample from best_icp_1618 'public.icp_sample_18m' # A pre-generated 1k sample from best_icp_18m ] def getkwh(datestart, dateend, timestart, timeend, icp_tab, verbose = True): """Get kwh data from database """ query = """ SELECT SUBSTRING(comb.icp_id FROM 2 FOR 6)::int AS icp_id, comb.read_time, COALESCE(kwh_tot, 0) AS kwh_tot FROM ( SELECT read_time, icp_id FROM ( SELECT read_time FROM GENERATE_SERIES(%(tsstart)s::timestamp, %(tsend)s::timestamp, '30 minutes'::interval) read_time ) AS tsdata CROSS JOIN {} ) AS comb LEFT JOIN ( SELECT *, read_date + CONCAT(period / 2, ':', period %% 2 * 30, ':00')::time AS read_time FROM ( SELECT a.icp_id , a.read_date , c.period , sum(c.read_kwh) as kwh_tot , sum(case when a.content_code = 'UN' then c.read_kwh else 0 end) as kwh_un , sum(case when a.content_code in ('CN','EG') then c.read_kwh else 0 end) as kwh_cn FROM coup_prd.coupdatamaster a, unnest(a.read_array) WITH ORDINALITY c(read_kwh, period) WHERE a.read_date >= to_date(%(datestart)s,'yyyy-mm-dd') and a.read_date < to_date(%(dateend)s,'yyyy-mm-dd') and a.content_code ~ ('UN|CN|EG') AND a.icp_id IN ( SELECT icp_id FROM {} ) GROUP BY 1, 2, 3 ) AS coup_tall ) AS tall_timestamp ON comb.read_time = tall_timestamp.read_time AND comb.icp_id = tall_timestamp.icp_id; """ query = query.format(icp_tab, icp_tab) pdict = { 'datestart': datestart, 'dateend': dateend, 'tsstart': timestart, 'tsend': timeend # 'subset': subset } if verbose: print("Getting data with parameters:") pprint(pdict) qdf = getQuery(query, pdict, verbose) if verbose: print("Optimising") qdf['icp_id'] = qdf['icp_id'].astype(np.int32) qdf['kwh_tot'] = qdf['kwh_tot'].astype(np.float16) # print("Done") return(qdf) def collateddownload(startd, endd, numdivis, icp_tab, pivot, verbose): """ Download dataset in pieces, then combine """ with TemporaryDirectory() as tempdir: divset = p.date_range(startd, endd, periods = numdivis + 1).strftime("%Y-%m-%d") divlow = divset[:-1] divhig = divset[1:] for i in trange(numdivis): datestart = divlow[i] dateend = divhig[i] datetimeend = dateend + " 00:00:00" datetimestart = datestart + " 00:30:00" filename = "{}/{}temp.pkl".format(tempdir, i) tdf = getkwh(datestart, dateend, datetimestart, datetimeend, icp_tab, verbose) if pivot: if verbose: print("Pivoting") tdf = tdf.pivot(index = 'read_time', columns = 'icp_id', values = 'kwh_tot') tdf.to_pickle(filename) del tdf coldf = p.read_pickle("{}/{}temp.pkl".format(tempdir, 0)) for i in trange(1, numdivis): filename = "{}/{}temp.pkl".format(tempdir, i) tdf = p.read_pickle(filename) coldf = p.concat([coldf, tdf]) del tdf gc.collect() return coldf def main(): parser = ArgumentParser(description='Download kwh data from database') parser.add_argument("-o", "--output", dest="output", help = "output pickle path", metavar="PATH", required = True) parser.add_argument("-s", "--start-date", dest = "startdate", help = "start date for download; format: YYYY-MM-DD; default: 2017-01-01", metavar="DATE", default = "2017-01-01", type = datevalid) parser.add_argument("-e", "--end-date", dest = "enddate", help = "end date for download; format: YYYY-MM-DD; default: 2018-01-01", metavar="DATE", default = "2018-01-01", type = datevalid) parser.add_argument("-t", "--table", dest = "table", help = "table for download (constrained to specific values in source); default: public.icp_sample", metavar="TABLE", default = "public.icp_sample", choices = tables) parser.add_argument("-n", "--num-div", dest="numdiv", help = "number of segments to divide download into", metavar = "NUM", default = 12, type = int) parser.add_argument("--no-pivot", dest = "pivot", help = "output dataframe in tall (non-pivoted) format", action ="store_false") parser.add_argument("-v", "--verbose", dest = "verbose", action ="store_true") args = parser.parse_args() cdata = collateddownload(args.startdate, args.enddate, args.numdiv, args.table, args.pivot, args.verbose) cdata.to_pickle(args.output) if __name__ == "__main__": main()