Ejemplo Python para descargar en fichero CSV
Este es un ejemplo de programa hecho en python 3. Si usted lo copia y sustituye en la línea 8 la variable llamada ‘refresh_token’ por el valor obtenido en su usuario desde la aplicación tbRetail el programa le generará un fichero llamado ‘tbretail_data.csv’ con los datos solicitados.
Consideraciones:
En la línea 14 usted puede cambiar el formato del fichero de salida a CSV o XLSX.
En la línea 204 y 205 puede cambiar las fechas de inicio y final por las que le interese a usted.
En la línea 206 puede cambiar la métrica de descarga por la que le interese ENTERS, EXITS, PASSERSBY, ETC.
En la línea 207 puede cambiar la agrupación temporal de los datos que descarga, HOUR para agrupación por hora, DAY para agrupación por día, MONTH para agrupación por mes, y asi sucesivamente.
En el caso del que el período solicitado sea muy grande tbRetail le devolverá el mensaje ‘No hay Datos’
import requests
import json
import datetime
import csv
import xlsxwriter
import io
refresh_token = "poner su refresh_token"
client_id = '63h4jubc7qvrg9f65c36vjp3bq'
redirect_uri = 'https://login.tbretail.com/mytoken/index.html'
DEFAULT_FILE_NAME = "tbretail_data"
# Set file config (the file will be created into the directory where this script has been executed from)
FILE_FORMAT = "CSV" # "CSV" || "XLSX"
FILE_IS_INCLUDE_HEADER = True
FILE_NAME = DEFAULT_FILE_NAME # Do not include the file extension
FILE__CSV_DELIMITER__STRING = "\"" # Only allowed one-character strings
FILE__CSV_DELIMITER__FIELD = ";" # Only allowed one-character strings
def create_file_content(result_getdata):
# @return the file content as an array of arrays:
#
# file_content = [
# [ col_1_header, col_2_header, col_3_header, col_N_header, ... ], # Header is optional
# [ col_1_value, col_2_value, col_3_value, col_N_value, ... ],
# [ col_1_value, col_2_value, col_3_value, col_N_value, ... ],
# [ col_1_value, col_2_value, col_3_value, col_N_value, ... ],
# ...
# ]
#
columns = []
# COLUMN NAMES
common_columns = [
"group_id",
"start_datetime",
"metric",
"value"
"end_date"
]
for column in common_columns:
columns.append(column)
# FILE CONTENT
file_content = []
# FILE CONTENT > ROWS
series = result_getdata['series']
categories = result_getdata['categories']
for serie in series:
group_id = serie['group']
metric = serie['metric']
serie_data = serie['data']
for category in categories:
line = []
unixdate = int(category) / 1000
starttime = datetime.datetime.utcfromtimestamp(unixdate)
value = serie_data[categories.index(category)]
line.append(group_id)
line.append(starttime.strftime("%Y-%m-%d %H:%M:%S"))
line.append(metric)
line.append(value)
file_content.append(line)
return file_content
def create_file(result_getdata):
# Create file content
file_content = create_file_content(result_getdata)
file_name_with_extension = ""
file = None
file_size = 0
# Create the CSV/XLSX file
if FILE_FORMAT == "CSV":
# https://docs.python.org/3/library/csv.html
file_name_with_extension = f"{FILE_NAME}.csv"
csv_file = io.StringIO()
csv_content = csv.writer(
csv_file
, delimiter=FILE__CSV_DELIMITER__FIELD # Only allowed one-character strings.
, quotechar=FILE__CSV_DELIMITER__STRING # Only allowed one-character strings.
, doublequote=True # Default value
, escapechar=None # Default value
, quoting=csv.QUOTE_MINIMAL # Default value
, lineterminator="\r\n" # Default value
, skipinitialspace=False # Default value
, strict=False # Default value
)
# REMINDER: the CSV library has a dedicated method
# "writeheaders()", but it has to be used only if the content is
# provided through a "csv.DictWriter()", not a "csv.writer()".
#
# Hence, if the first item of "file_content" corresponds to cols
# headers, it will be included as a normal row along with the rest
# of items of the "file_content".
# There's no need to treat the first item separatedly.
csv_content.writerows(file_content)
csv_file.seek(0) # go to the start of the stream
# Write the file into the file system
with open(file_name_with_extension, 'w') as _file:
_file.write(csv_file.read())
file = csv_file
# Get the file size
csv_file.seek(0) # go to the start of the stream
output = io.BytesIO(csv_file.read().encode('utf8'))
file_size = output.getbuffer().nbytes
elif FILE_FORMAT == "XLSX":
file_name_with_extension = f"{FILE_NAME}.xlsx"
xlsx_file = None
# https://xlsxwriter.readthedocs.io/tutorial01.html
# output = io.BytesIO()
# Create a workbook and add a worksheet.
workbook = xlsxwriter.Workbook(file_name_with_extension)
worksheet = workbook.add_worksheet()
# Start from the first cell. Rows and columns are zero indexed.
row = 0
col = 0
# Set ROWS
for row_idx, row_columns in enumerate(file_content):
# REMINDER: if the first item of "file_content" corresponds
# to cols headers, it will be included as a normal row along with the
# rest of items of the "file_content".
# There's no need to treat the first item separatedly.
row = row_idx
# Set COLUMNS
for col_idx, col_value in enumerate(row_columns):
col = col_idx
worksheet.write(row, col, col_value)
# Close workbook
workbook.close()
xlsx_file = workbook
file = xlsx_file
# file_size = output.getbuffer().nbytes
else:
raise Exception(
f"The file format {FILE_FORMAT} cannot be handled. It must be either the ENUM option \"CSV\" or \"XLSX\".")
print("File \"%s\" created successfully!\n" % (file_name_with_extension))
print("File size: %s bytes\n" % (file_size))
print("File: %s\n" % (file))
if __name__ == '__main__':
# realizar la peticion de autenticacion para obtener el token temporal de acceso
token_url = 'https://auth.tbretail.com/oauth2/token'
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = {'grant_type': 'refresh_token', 'client_id': client_id, 'redirect_uri': redirect_uri,
'refresh_token': refresh_token}
token_response = requests.post(token_url, data=data, verify=False, allow_redirects=False, headers=headers)
access_token = token_response.json()['access_token']
# obtenido el token temporal (access_token) podremos realizar llamadas al graphql
api_url = 'https://api.tbretail.com'
headers = {
'Authorization': access_token
}
#
# Las variables start_date y end_date se deben acomodar a las necesidades del cliente
#
json_data = {
"query": """
query {
getData(
companies: [96]
brands: []
locations: []
zones: []
start_date: "2022-05-31"
end_date: "2022-05-31"
metrics: [{name: PASSERSBY, operation: SUM}]
category: {dimension: HOUR interval: 1}
group: {dimension: LOCATION}
) {
series {
group
metric
data
}
categories
}
}
"""
}
data_response = requests.post(api_url, json=json_data, headers=headers)
# una vez obtenidos los datos se pueden transformar
# en una lista
json_data = json.loads(data_response.content)
result_getdata = json_data['data']['getData']
if result_getdata:
create_file(result_getdata)
series = result_getdata['series']
categories = result_getdata['categories']
print("group_id,start_datetime,metric,value")
for serie in series:
group_id = serie['group']
metric = serie['metric']
serie_data = serie['data']
for category in categories:
unixdate = int(category) / 1000
starttime = datetime.datetime.utcfromtimestamp(unixdate)
value = serie_data[categories.index(category)]
line = "%s,%s,%s,%s" % (group_id, starttime.strftime("%Y-%m-%d %H:%M:%S"), metric, value)
print(line)
else:
print("No hay datos")