service.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. # -*- coding: utf8 -*-
  2. from __future__ import print_function, absolute_import
  3. from tornado.ioloop import IOLoop
  4. from tornado.web import Application
  5. from fourier.api.client import Client, ConnectionError
  6. from fourier.boxconfig import parse_config
  7. from fourier.dejavu.recognize import FilePerSecondRecognizer
  8. from datetime import datetime, timedelta
  9. from ondemand.endpoint import setup_endpoint
  10. from fourier.dejavu import Dejavu
  11. from Queue import Queue, Empty
  12. from firebase_admin import credentials
  13. from firebase_admin import db as fbdb
  14. from binascii import hexlify
  15. from base64 import b64decode
  16. import logging as log
  17. import firebase_admin
  18. import mutagen.mp3
  19. import requests
  20. import dateutil
  21. import sqlite3
  22. import time
  23. import sys
  24. import os
  25. log.basicConfig(format='[%(asctime)s] %(message)s', level=log.INFO)
  26. AUDIOS_PATH = '/tmp'
  27. AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
  28. config = parse_config()
  29. queue = Queue()
  30. client = Client(config['device_id'],
  31. config['apiSecret'])
  32. cloud_base_url = 'https://storage.googleapis.com/{}'\
  33. .format(config['bucket'])
  34. base_path = config.get("basepath", "/var/fourier")
  35. fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
  36. firebase_admin.initialize_app(fb_credentials, config['firebase'])
  37. dejavu = Dejavu({"database_type":"mem"})
  38. device_id = config['device_id']
  39. device_path = os.path.join(base_path, device_id)
  40. recognizer = FilePerSecondRecognizer
  41. db_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))
  42. db = sqlite3.connect(db_path)
  43. cloud_cache = {}
  44. def feed_queue():
  45. """ Search for pending scheduled work in
  46. server and add them to a memory queue. """
  47. try:
  48. response = client.get_schedule_pending()
  49. downloaded_counter = len(response['items'])
  50. for item in response['items']:
  51. queue.put(item)
  52. if downloaded_counter:
  53. log.info(('[feed_queue] {} new '
  54. + 'pending schedule items.')\
  55. .format(downloaded_counter)
  56. )
  57. if queue.qsize() > 0:
  58. loop.add_callback(process_queue)
  59. else:
  60. loop.add_timeout(time.time() + 30, feed_queue)
  61. except ConnectionError as ex:
  62. log.error('[feed_queue] cannot feed: {}, retryig later'.format(ex))
  63. loop.add_timeout(time.time() + 15, feed_queue)
  64. except Exception as ex:
  65. """ Errores desconocidos """
  66. log.error('[feed_queue] {}'.format(ex))
  67. loop.add_timeout(time.time() + 60, feed_queue)
  68. def process_queue():
  69. """ Try to the next item in a queue and start
  70. processing it accordingly. If success, repeat
  71. the function or go to feed if no more items. """
  72. try:
  73. item = queue.get(False)
  74. process_segment(item)
  75. loop.add_callback(process_queue)
  76. except Empty:
  77. loop.add_callback(feed_queue)
  78. except Exception as ex:
  79. log.error(ex)
  80. loop.add_callback(process_queue)
  81. def process_segment(item):
  82. """ Procesa una hora de audio """
  83. station = item['station']
  84. date = dateutil.parser.parse(item['date'])
  85. log.info('processing segment: {}'.format(item))
  86. # 1. obtener el audio desde firebase
  87. # y calcular su fingerprint.
  88. filename, md5hash = cloud_download(ad_key=item['ad'])
  89. if not filename:
  90. log.info('ad file missing')
  91. return
  92. # 1.1 Calcular el número de segmentos requeridos
  93. # de acuerdo a la duración total del audio.
  94. try:
  95. audio = mutagen.mp3.MP3(filename)
  96. segments_needed = int(round(float(audio.info.length) / float(5)))
  97. except Exception as ex:
  98. log.error('file {} is not an mp3'.format(audio))
  99. log.error(str(ex))
  100. return
  101. try:
  102. dejavu.fingerprint_file(filename)
  103. except Exception as ex:
  104. log.error('cannot fingerprint: {}'.format(ex))
  105. # 2. Read the list of files from local database
  106. audios_counter = 0
  107. results = []
  108. for path, name, ts in iterate_audios(date, station):
  109. log.info('file: {}'.format(path))
  110. audios_counter += os.path.isfile(path)
  111. for match in dejavu.recognize(recognizer, path, 5,
  112. ads_filter=[md5hash]):
  113. try:
  114. results.append({
  115. 'confidence': match['confidence'],
  116. 'timestamp': ts,
  117. 'offset': match['offset']
  118. })
  119. log.info("{} {}".format(ts, match['confidence']))
  120. except KeyError as ex:
  121. log.error(str(ex))
  122. ts += match['length'] / 1000
  123. try:
  124. response = client.put_schedule_results(
  125. item['schedule'],
  126. item['id'],
  127. None, # TODO: send results again
  128. found=find_repetitions(results,
  129. segments_needed=segments_needed
  130. ),
  131. missing_files=(12 - audios_counter) \
  132. if audios_counter < 12 else 0
  133. )
  134. log.info('API response: {}'.format(response))
  135. except ConnectionError as ex:
  136. log.error(str(ex))
  137. except UserWarning as warn:
  138. log.warning(str(warn))
  139. def find_repetitions(results, segments_needed=2):
  140. found_counter = 0
  141. found_index = None
  142. seconds_needed = 9
  143. threshold = 20
  144. expect_space = False
  145. found = []
  146. if segments_needed < 1:
  147. segments_needed = 1
  148. for index, result in enumerate(results):
  149. if not expect_space:
  150. if result['confidence'] > threshold:
  151. found_counter += 1
  152. if found_index is None:
  153. found_index = index
  154. else:
  155. found_counter = 0
  156. found_index = None
  157. else:
  158. if result['confidence'] <= threshold:
  159. expect_space = False
  160. if found_counter >= segments_needed:
  161. found.append(results[found_index]['timestamp'])
  162. found_counter = 0
  163. expect_space = True
  164. return found
  165. def iterate_audios(dt, station):
  166. """ Given a datetime object and an station,
  167. iterate a list of files that are between
  168. the the date and itself plus 5 minutes;
  169. station must match too """
  170. from_time = time.mktime(dt.timetuple()) \
  171. - AHEAD_TIME_AUDIO_TOLERANCE
  172. to_time = from_time + 3599 + AHEAD_TIME_AUDIO_TOLERANCE
  173. log.info('from {} to {}'.format(int(from_time), int(to_time)))
  174. cursor = db.cursor()
  175. cursor.execute((
  176. 'select "filename", "timestamp" '
  177. 'from "file" '
  178. 'where "timestamp" between ? and ? '
  179. 'and "station" = ? '
  180. 'order by "timestamp" asc'
  181. ),
  182. (from_time, to_time, station, ),
  183. )
  184. files = [file for file in cursor]
  185. cursor.close()
  186. for mp3 in files:
  187. mp3path, ts = mp3
  188. mp3name = os.path.basename(mp3path)
  189. yield (mp3path, mp3name, ts)
  190. def cloud_download(ad_key=None):
  191. """ Given an ad key, the file is downloaded to
  192. the system temporal folder to be processed """
  193. if ad_key in cloud_cache:
  194. """ If this file has already been downloaded,
  195. will not be downloaded again, instead will
  196. be taken from cloud_cache dictionary """
  197. filename, md5hash = cloud_cache[ad_key]
  198. if os.path.isfile(filename):
  199. return filename, md5hash
  200. ad = fbdb.reference('ads/{}'.format(ad_key)).get()
  201. filename = os.path.basename(ad['path'])
  202. out_file = os.path.join(AUDIOS_PATH, filename)
  203. url = '{}/{}'.format(cloud_base_url, ad['path'])
  204. response = requests.get(url)
  205. if response.status_code == 200:
  206. hashes = response.headers['x-goog-hash']
  207. hashes = hashes.split(',')
  208. hashes = [h.split('=', 1) for h in hashes]
  209. hashes = {h[0].strip(): hexlify(b64decode(h[1])) for h in hashes}
  210. md5sum = hashes['md5']
  211. with open(out_file, "wb") as fp:
  212. fp.write(response.content)
  213. tp = (out_file, md5sum,)
  214. cloud_cache[ad_key] = tp
  215. return tp
  216. app = setup_endpoint(queue=queue)
  217. loop = IOLoop.current()
  218. loop.add_callback(feed_queue)
  219. if __name__ == '__main__':
  220. try:
  221. log.info('Starting ondemand service')
  222. loop.start()
  223. except KeyboardInterrupt:
  224. log.error('Process killed')