Ejemplo Python para descargar en fichero CSV

Este es un ejemplo de programa hecho en python 3. Si usted lo copia y sustituye en la línea 8 la variable llamada ‘refresh_token’ por el valor obtenido en su usuario desde la aplicación tbRetail el programa le generará un fichero llamado ‘tbretail_data.csv’ con los datos solicitados.

Consideraciones:

  • En la línea 14 usted puede cambiar el formato del fichero de salida a CSV o XLSX.

  • En la línea 204 y 205 puede cambiar las fechas de inicio y final por las que le interese a usted.

  • En la línea 206 puede cambiar la métrica de descarga por la que le interese ENTERS, EXITS, PASSERSBY, ETC.

  • En la línea 207 puede cambiar la agrupación temporal de los datos que descarga, HOUR para agrupación por hora, DAY para agrupación por día, MONTH para agrupación por mes, y asi sucesivamente.

En el caso del que el período solicitado sea muy grande tbRetail le devolverá el mensaje ‘No hay Datos’

import requests import json import datetime import csv import xlsxwriter import io refresh_token = "poner su refresh_token" client_id = '63h4jubc7qvrg9f65c36vjp3bq' redirect_uri = 'https://login.tbretail.com/mytoken/index.html' DEFAULT_FILE_NAME = "tbretail_data" # Set file config (the file will be created into the directory where this script has been executed from) FILE_FORMAT = "CSV" # "CSV" || "XLSX" FILE_IS_INCLUDE_HEADER = True FILE_NAME = DEFAULT_FILE_NAME # Do not include the file extension FILE__CSV_DELIMITER__STRING = "\"" # Only allowed one-character strings FILE__CSV_DELIMITER__FIELD = ";" # Only allowed one-character strings def create_file_content(result_getdata): # @return the file content as an array of arrays: # # file_content = [ # [ col_1_header, col_2_header, col_3_header, col_N_header, ... ], # Header is optional # [ col_1_value, col_2_value, col_3_value, col_N_value, ... ], # [ col_1_value, col_2_value, col_3_value, col_N_value, ... ], # [ col_1_value, col_2_value, col_3_value, col_N_value, ... ], # ... # ] # columns = [] # COLUMN NAMES common_columns = [ "group_id", "start_datetime", "metric", "value" "end_date" ] for column in common_columns: columns.append(column) # FILE CONTENT file_content = [] # FILE CONTENT > ROWS series = result_getdata['series'] categories = result_getdata['categories'] for serie in series: group_id = serie['group'] metric = serie['metric'] serie_data = serie['data'] for category in categories: line = [] unixdate = int(category) / 1000 starttime = datetime.datetime.utcfromtimestamp(unixdate) value = serie_data[categories.index(category)] line.append(group_id) line.append(starttime.strftime("%Y-%m-%d %H:%M:%S")) line.append(metric) line.append(value) file_content.append(line) return file_content def create_file(result_getdata): # Create file content file_content = create_file_content(result_getdata) file_name_with_extension = "" file = None file_size = 0 # Create the CSV/XLSX file if FILE_FORMAT == "CSV": # https://docs.python.org/3/library/csv.html file_name_with_extension = f"{FILE_NAME}.csv" csv_file = io.StringIO() csv_content = csv.writer( csv_file , delimiter=FILE__CSV_DELIMITER__FIELD # Only allowed one-character strings. , quotechar=FILE__CSV_DELIMITER__STRING # Only allowed one-character strings. , doublequote=True # Default value , escapechar=None # Default value , quoting=csv.QUOTE_MINIMAL # Default value , lineterminator="\r\n" # Default value , skipinitialspace=False # Default value , strict=False # Default value ) # REMINDER: the CSV library has a dedicated method # "writeheaders()", but it has to be used only if the content is # provided through a "csv.DictWriter()", not a "csv.writer()". # # Hence, if the first item of "file_content" corresponds to cols # headers, it will be included as a normal row along with the rest # of items of the "file_content". # There's no need to treat the first item separatedly. csv_content.writerows(file_content) csv_file.seek(0) # go to the start of the stream # Write the file into the file system with open(file_name_with_extension, 'w') as _file: _file.write(csv_file.read()) file = csv_file # Get the file size csv_file.seek(0) # go to the start of the stream output = io.BytesIO(csv_file.read().encode('utf8')) file_size = output.getbuffer().nbytes elif FILE_FORMAT == "XLSX": file_name_with_extension = f"{FILE_NAME}.xlsx" xlsx_file = None # https://xlsxwriter.readthedocs.io/tutorial01.html # output = io.BytesIO() # Create a workbook and add a worksheet. workbook = xlsxwriter.Workbook(file_name_with_extension) worksheet = workbook.add_worksheet() # Start from the first cell. Rows and columns are zero indexed. row = 0 col = 0 # Set ROWS for row_idx, row_columns in enumerate(file_content): # REMINDER: if the first item of "file_content" corresponds # to cols headers, it will be included as a normal row along with the # rest of items of the "file_content". # There's no need to treat the first item separatedly. row = row_idx # Set COLUMNS for col_idx, col_value in enumerate(row_columns): col = col_idx worksheet.write(row, col, col_value) # Close workbook workbook.close() xlsx_file = workbook file = xlsx_file # file_size = output.getbuffer().nbytes else: raise Exception( f"The file format {FILE_FORMAT} cannot be handled. It must be either the ENUM option \"CSV\" or \"XLSX\".") print("File \"%s\" created successfully!\n" % (file_name_with_extension)) print("File size: %s bytes\n" % (file_size)) print("File: %s\n" % (file)) if __name__ == '__main__': # realizar la peticion de autenticacion para obtener el token temporal de acceso token_url = 'https://auth.tbretail.com/oauth2/token' headers = {'Content-Type': 'application/x-www-form-urlencoded'} data = {'grant_type': 'refresh_token', 'client_id': client_id, 'redirect_uri': redirect_uri, 'refresh_token': refresh_token} token_response = requests.post(token_url, data=data, verify=False, allow_redirects=False, headers=headers) access_token = token_response.json()['access_token'] # obtenido el token temporal (access_token) podremos realizar llamadas al graphql api_url = 'https://api.tbretail.com' headers = { 'Authorization': access_token } # # Las variables start_date y end_date se deben acomodar a las necesidades del cliente # json_data = { "query": """ query { getData( companies: [96] brands: [] locations: [] zones: [] start_date: "2022-05-31" end_date: "2022-05-31" metrics: [{name: PASSERSBY, operation: SUM}] category: {dimension: HOUR interval: 1} group: {dimension: LOCATION} ) { series { group metric data } categories } } """ } data_response = requests.post(api_url, json=json_data, headers=headers) # una vez obtenidos los datos se pueden transformar # en una lista json_data = json.loads(data_response.content) result_getdata = json_data['data']['getData'] if result_getdata: create_file(result_getdata) series = result_getdata['series'] categories = result_getdata['categories'] print("group_id,start_datetime,metric,value") for serie in series: group_id = serie['group'] metric = serie['metric'] serie_data = serie['data'] for category in categories: unixdate = int(category) / 1000 starttime = datetime.datetime.utcfromtimestamp(unixdate) value = serie_data[categories.index(category)] line = "%s,%s,%s,%s" % (group_id, starttime.strftime("%Y-%m-%d %H:%M:%S"), metric, value) print(line) else: print("No hay datos")