ondemand.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. from __future__ import print_function, absolute_import
  2. from tornado.ioloop import IOLoop
  3. from client import Client, ConnectionError
  4. from boxconfig import parse_config
  5. from dejavu.recognize import FilePerSecondRecognizer
  6. from dejavu import Dejavu, CouldntDecodeError
  7. from endpoint import setup_endpoint
  8. from multiprocessing import Process
  9. import logging as log
  10. import requests
  11. import dateutil
  12. import math
  13. import time
  14. import os
  15. from queue import Queue, Empty
  16. log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
  17. PATH = '/tmp'
  18. AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
  19. MAX_SEGMENT_THREADS = 4
  20. THRESHOLD = 10
  21. SEGMENTS_TOLERANCE_RATE = 0.6
  22. FALL_TOLERANCE_SEGMENTS = 1
  23. THRESHOLD_FIXED = 1
  24. THRESHOLD_AVERAGE = 2
  25. QUEUE_SINGLE = 1
  26. config = parse_config()
  27. queue = Queue()
  28. cloud_base_url = 'https://storage.googleapis.com/{}' \
  29. .format(config['bucket'])
  30. recognizer = FilePerSecondRecognizer
  31. threshold_mode = THRESHOLD_FIXED
  32. def obt_siguiente_trabajo():
  33. url = 'https://api.fourier.audio/na/calendario/pendiente?id=%s' % (config['device_id'],)
  34. response = requests.get(url)
  35. log.info(response.json())
  36. return response.json()
  37. def descargar_anuncio(ad_path):
  38. anuncio = os.path.basename(ad_path)
  39. path = os.path.join(PATH, 'ads')
  40. os.makedirs(path, exist_ok=True)
  41. ruta_anuncio = os.path.join(path, anuncio)
  42. if os.path.isfile(ruta_anuncio):
  43. return ruta_anuncio
  44. url = '{}/{}'.format(cloud_base_url, ad_path)
  45. response = requests.get(url)
  46. # TODO: Agregar alerta cuando la respuesta no sea 200
  47. if response.status_code == 200:
  48. with open(ruta_anuncio, "wb") as fp:
  49. fp.write(response.content)
  50. return ruta_anuncio
  51. else:
  52. print("Error al descargar")
  53. print(response)
  54. return None
  55. def descargar_media(box, station, media):
  56. ref = '{}/{}/{}'.format(box, station, media)
  57. file = os.path.basename(ref)
  58. path = os.path.join(PATH, 'fourier', box, station)
  59. os.makedirs(path, exist_ok=True)
  60. out_file = os.path.join(path, file)
  61. if os.path.isfile(out_file):
  62. return out_file
  63. filename = ref.replace("/","%2F") \
  64. .replace("+","%2B")
  65. url = '{}/{}'.format(cloud_base_url, filename)
  66. response = requests.get(url)
  67. if response.status_code == 200:
  68. with open(out_file, "wb") as fp:
  69. fp.write(response.content)
  70. return out_file
  71. else:
  72. print("Error al descargar")
  73. print(response)
  74. return None
  75. def obt_calibracion(calibracion):
  76. default = {
  77. 'threshold': 12,
  78. 'tolerance': 0.8,
  79. 'fallTolerance': 1,
  80. 'segmentSize': 5,
  81. }
  82. if 'threshold' in calibracion:
  83. default['threshold'] = calibracion['threshold']
  84. if 'tolerance' in calibracion:
  85. default['tolerance'] = calibracion['tolerance']
  86. if 'segmentSize' in calibracion:
  87. default['segmentSize'] = calibracion['segmentSize']
  88. if 'fallTolerance' in calibracion:
  89. default['fallTolerance'] = calibracion['fallTolerance']
  90. return default
  91. def enviar_resultados(item):
  92. url = 'https://api.fourier.audio/v1/calendario/resultado'
  93. response = requests.post(url, json=item)
  94. return response
  95. def llenar_pila():
  96. """ Search for pending scheduled work in
  97. server and add them to a memory queue. """
  98. try:
  99. response = obt_siguiente_trabajo()
  100. # response = client.get_schedule_pending()
  101. # downloaded_counter = len(response['items'])
  102. # for item in response['items']:
  103. if len(response["elementos"]) > 0:
  104. queue.put(response)
  105. if queue.qsize() > 0:
  106. loop.add_callback(procesar_siguiente_pila)
  107. else:
  108. loop.add_timeout(time.time() + 30, llenar_pila)
  109. except ConnectionError as ex:
  110. log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex))
  111. loop.add_timeout(time.time() + 15, llenar_pila)
  112. except Exception as ex:
  113. """ Errores desconocidos """
  114. log.error('[feed_queue] {}'.format(ex))
  115. loop.add_timeout(time.time() + 60, llenar_pila)
  116. raise ex
  117. def procesar_siguiente_pila():
  118. """ Try to the next item in a queue and start
  119. processing it accordingly. If success, repeat
  120. the function or go to feed if no more items. """
  121. try:
  122. item = queue.get(False)
  123. procesar_trabajo(item)
  124. loop.add_callback(procesar_siguiente_pila)
  125. except Empty:
  126. loop.add_callback(llenar_pila)
  127. except Exception as ex:
  128. log.error(ex)
  129. loop.add_callback(procesar_siguiente_pila)
  130. def procesar_trabajo(pendiente):
  131. ciudad = pendiente['origen']
  132. estacion = pendiente['estacion']
  133. # Descarga de anuncios
  134. try:
  135. anuncios = []
  136. id_by_ad = {}
  137. item_ids = []
  138. x = 0
  139. for i in pendiente["elementos"]:
  140. x = x + 1
  141. id_by_ad[i['anuncio']] = i['id']
  142. if i['id'] not in item_ids:
  143. item_ids.append(i['id'])
  144. anuncio = descargar_anuncio(i["ruta"])
  145. if anuncio is not None:
  146. anuncios.append(anuncio)
  147. except Exception as err:
  148. print('[process_segment] [{}] {}'.format(estacion, err))
  149. # Descarga de media
  150. try:
  151. media = []
  152. for i in pendiente["media"]:
  153. archivo = descargar_media(ciudad, estacion, i["ruta"])
  154. if archivo is not None:
  155. media.append((archivo, i["fecha"], i["timestamp"]))
  156. except Exception as err:
  157. print(err)
  158. dejavu = None
  159. resultados = []
  160. v = []
  161. resultados = {}
  162. try:
  163. dejavu = Dejavu({"database_type": "mem"})
  164. try:
  165. x = 0
  166. for ruta, fecha, ts in media:
  167. print("Huellando %s" % (ruta,))
  168. dejavu.fingerprint_file(ruta)
  169. except Exception as ex:
  170. print(err)
  171. for anuncio in anuncios:
  172. for i in dejavu.recognize(recognizer, anuncio, 5):
  173. if not "id" in i:
  174. continue
  175. if i["confidence"] < 35:
  176. continue
  177. print(i)
  178. obj = i
  179. obj["match_time"] = None
  180. dict = {
  181. "id": obj["id"],
  182. "anuncio": anuncio,
  183. "fecha": obj["name"],
  184. "confianza": obj["confidence"],
  185. "longitud": obj["length"],
  186. "desfase_segundos": obj["offset_seconds"]
  187. }
  188. if i["id"] in resultados.keys():
  189. resultados[i["id"]]["longitud"] = resultados[i["id"]]["longitud"] + dict["longitud"]
  190. resultados[i["id"]]["confianza"] = resultados[i["id"]]["confianza"] + dict["confianza"]
  191. continue
  192. resultados[i["id"]] = dict
  193. except Exception as ex:
  194. print(err)
  195. app = setup_endpoint(queue=queue)
  196. loop = IOLoop.current()
  197. loop.add_callback(llenar_pila)
  198. if __name__ == '__main__':
  199. try:
  200. log.info('Starting ondemand service')
  201. loop.start()
  202. except KeyboardInterrupt:
  203. log.error('Process killed')