123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- from argparse import ArgumentParser
- # from psycopg2 import sql
- import gc
- from util import getQuery, datevalid
- import pandas as p
- import gc
- from datetime import datetime
- from tqdm import tqdm, trange
- from pprint import pprint
- from tempfile import TemporaryDirectory
- import numpy as np
-
- tables = [
- 'public.best_icp', # All icps with at least 360 days of data in 2017
- 'public.best_icp_1618', # All icps with at least 720 days of data in 2 years from 1 April 2016
- 'public.best_icp_18m', # All icps with at least 540 days of data from July 2016 to end of 2017
- 'public.icp_sample', # A pre-generated 1k sample from best_icp
- 'public.icp_sample_5k', # A pre-generated 5k sample from best_icp
- 'public.icp_sample_1618', # A pre-generated 1k sample from best_icp_1618
- 'public.icp_sample_18m' # A pre-generated 1k sample from best_icp_18m
- ]
-
-
- def getkwh(datestart, dateend, timestart, timeend, icp_tab, verbose = True):
- """Get kwh data from database
- """
- query = """
- SELECT SUBSTRING(comb.icp_id FROM 2 FOR 6)::int AS icp_id, comb.read_time, COALESCE(kwh_tot, 0) AS kwh_tot
- FROM
- (
- SELECT read_time, icp_id
- FROM
- (
- SELECT read_time
- FROM GENERATE_SERIES(%(tsstart)s::timestamp, %(tsend)s::timestamp,
- '30 minutes'::interval) read_time
- ) AS tsdata CROSS JOIN {}
- ) AS comb
- LEFT JOIN
- (
- SELECT *, read_date + CONCAT(period / 2, ':', period %% 2 * 30, ':00')::time AS read_time
- FROM (
- SELECT a.icp_id
- , a.read_date
- , c.period
- , sum(c.read_kwh) as kwh_tot
- , sum(case when a.content_code = 'UN' then c.read_kwh else 0 end) as kwh_un
- , sum(case when a.content_code in ('CN','EG') then c.read_kwh else 0 end) as kwh_cn
- FROM coup_prd.coupdatamaster a,
- unnest(a.read_array) WITH ORDINALITY c(read_kwh, period)
- WHERE a.read_date >= to_date(%(datestart)s,'yyyy-mm-dd')
- and a.read_date < to_date(%(dateend)s,'yyyy-mm-dd')
- and a.content_code ~ ('UN|CN|EG')
- AND a.icp_id IN (
- SELECT icp_id FROM {}
- )
- GROUP BY 1, 2, 3
- ) AS coup_tall
- ) AS tall_timestamp
- ON comb.read_time = tall_timestamp.read_time AND comb.icp_id = tall_timestamp.icp_id;
- """
- query = query.format(icp_tab, icp_tab)
- pdict = {
- 'datestart': datestart,
- 'dateend': dateend,
- 'tsstart': timestart,
- 'tsend': timeend
- # 'subset': subset
- }
-
- if verbose:
- print("Getting data with parameters:")
- pprint(pdict)
- qdf = getQuery(query, pdict, verbose)
- if verbose:
- print("Optimising")
- qdf['icp_id'] = qdf['icp_id'].astype(np.int32)
- qdf['kwh_tot'] = qdf['kwh_tot'].astype(np.float16)
- # print("Done")
- return(qdf)
-
-
- def collateddownload(startd, endd, numdivis, icp_tab, pivot, verbose):
- """
- Download dataset in pieces, then combine
- """
- with TemporaryDirectory() as tempdir:
- divset = p.date_range(startd, endd, periods = numdivis + 1).strftime("%Y-%m-%d")
- divlow = divset[:-1]
- divhig = divset[1:]
- for i in trange(numdivis):
- datestart = divlow[i]
- dateend = divhig[i]
- datetimeend = dateend + " 00:00:00"
- datetimestart = datestart + " 00:30:00"
- filename = "{}/{}temp.pkl".format(tempdir, i)
- tdf = getkwh(datestart, dateend, datetimestart, datetimeend, icp_tab, verbose)
- if pivot:
- if verbose:
- print("Pivoting")
- tdf = tdf.pivot(index = 'read_time', columns = 'icp_id', values = 'kwh_tot')
- tdf.to_pickle(filename)
- del tdf
- coldf = p.read_pickle("{}/{}temp.pkl".format(tempdir, 0))
- for i in trange(1, numdivis):
- filename = "{}/{}temp.pkl".format(tempdir, i)
- tdf = p.read_pickle(filename)
- coldf = p.concat([coldf, tdf])
- del tdf
- gc.collect()
- return coldf
-
-
- def main():
- parser = ArgumentParser(description='Download kwh data from database')
- parser.add_argument("-o", "--output", dest="output", help = "output pickle path", metavar="PATH", required = True)
- parser.add_argument("-s", "--start-date", dest = "startdate", help = "start date for download; format: YYYY-MM-DD; default: 2017-01-01", metavar="DATE", default = "2017-01-01", type = datevalid)
- parser.add_argument("-e", "--end-date", dest = "enddate", help = "end date for download; format: YYYY-MM-DD; default: 2018-01-01", metavar="DATE", default = "2018-01-01", type = datevalid)
- parser.add_argument("-t", "--table", dest = "table", help = "table for download (constrained to specific values in source); default: public.icp_sample", metavar="TABLE", default = "public.icp_sample", choices = tables)
- parser.add_argument("-n", "--num-div", dest="numdiv", help = "number of segments to divide download into", metavar = "NUM", default = 12, type = int)
- parser.add_argument("--no-pivot", dest = "pivot", help = "output dataframe in tall (non-pivoted) format", action ="store_false")
- parser.add_argument("-v", "--verbose", dest = "verbose", action ="store_true")
- args = parser.parse_args()
- cdata = collateddownload(args.startdate, args.enddate, args.numdiv, args.table, args.pivot, args.verbose)
- cdata.to_pickle(args.output)
-
-
-
-
- if __name__ == "__main__":
- main()
|