Browse Source

Download file to compare

Hugo 5 years ago
parent
commit
820c2f6827
1 changed files with 60 additions and 40 deletions
  1. 60 40
      ondemand/service.py

+ 60 - 40
ondemand/service.py

@@ -38,7 +38,7 @@ else:
 log.basicConfig(format='[%(asctime)s] [%(module)s] %(message)s', level=log.INFO)
 
 AUDIOS_PATH = '/tmp'
-AHEAD_TIME_AUDIO_TOLERANCE = 2 # second
+AHEAD_TIME_AUDIO_TOLERANCE = 2  # second
 MAX_SEGMENT_THREADS = 4
 THRESHOLD = 10
 SEGMENTS_TOLERANCE_RATE = 0.6
@@ -48,11 +48,11 @@ FALL_TOLERANCE_SEGMENTS = 1
 THRESHOLD_FIXED = 1
 THRESHOLD_AVERAGE = 2
 
-# Modos de procesamiento de queue
-#  - QUEQUE_SINGLE: procesa solo un segmento a la vez
+# Modos de procesamiento de queue
+#  - QUEQUE_SINGLE: procesa solo un segmento a la vez
 #  - QUEUE_THREAD:  inicia un hilo para cada segmento
 # Por default se usará el threaded.
-# TOOD: hacerlo configurable por medio de argumentos
+# TODO: hacerlo configurable por medio de argumentos
 #       de ejecución.
 QUEUE_SINGLE = 1
 QUEUE_THREAD = 2
@@ -65,8 +65,8 @@ config = parse_config()
 queue = Queue()
 client = Client(config['device_id'],
                 config['apiSecret'])
-cloud_base_url = 'https://storage.googleapis.com/{}'\
-                .format(config['bucket'])
+cloud_base_url = 'https://storage.googleapis.com/{}' \
+    .format(config['bucket'])
 base_path = config.get("basepath", "/var/fourier")
 fb_credentials = credentials.Certificate('/etc/Fourier-key.json')
 firebase_admin.initialize_app(fb_credentials, config['firebase'])
@@ -84,6 +84,7 @@ db_path = config.get('localDatabase', os.path.join(device_path, 'files.db'))
 db = sqlite3.connect(db_path)
 cloud_cache = {}
 
+
 def feed_queue():
     """ Search for pending scheduled work in
     server and add them to a memory queue. """
@@ -95,9 +96,9 @@ def feed_queue():
 
         if downloaded_counter:
             log.info(('[feed_queue] {} new '
-                    + 'pending schedule items.')\
-                    .format(downloaded_counter)
-                    )
+                      + 'pending schedule items.') \
+                     .format(downloaded_counter)
+                     )
 
         if queue.qsize() > 0:
             if queue_mode == QUEUE_THREAD:
@@ -117,6 +118,7 @@ def feed_queue():
         loop.add_timeout(time.time() + 60, feed_queue)
         raise ex
 
+
 def process_queue():
     """ Try to the next item in a queue and start
     processing it accordingly. If success, repeat
@@ -131,6 +133,7 @@ def process_queue():
         log.error(ex)
         loop.add_callback(process_queue)
 
+
 def process_queue_with_threads():
     threads = [None] * MAX_SEGMENT_THREADS
     is_drained = False
@@ -152,12 +155,12 @@ def process_queue_with_threads():
                     )]
 
                     thread = MultiAPI(target=process_segment,
-                        args=(item,),
-                        kwargs={
-                            'audios': audios,
-                            'calibration': calibration,
-                        }
-                    )
+                                      args=(item,),
+                                      kwargs={
+                                          'audios': audios,
+                                          'calibration': calibration,
+                                      }
+                                      )
                     threads[index] = thread
                     thread.start()
 
@@ -181,6 +184,7 @@ def process_queue_with_threads():
     log.info('Finished thread processing')
     loop.add_callback(feed_queue)
 
+
 def process_segment(item, audios=None, calibration=None):
     """ Procesa una hora de audio """
 
@@ -194,15 +198,15 @@ def process_segment(item, audios=None, calibration=None):
     segment_size = calibration['segmentSize']
     audio_length = 0
 
-    log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}'\
+    log.info('[process_segment] (th: {}, tl: {}, ft: {}, ss: {}, ho: {}) {}' \
         .format(
-            calibration['threshold'],
-            calibration['tolerance'],
-            calibration['fallTolerance'],
-            calibration['segmentSize'],
-            calibration['hourlyOffset'],
-            item,
-        )
+        calibration['threshold'],
+        calibration['tolerance'],
+        calibration['fallTolerance'],
+        calibration['segmentSize'],
+        calibration['hourlyOffset'],
+        item,
+    )
     )
 
     # 1. obtener el audio desde firebase
@@ -235,7 +239,7 @@ def process_segment(item, audios=None, calibration=None):
         log.error(str(ex))
         return
 
-    dejavu = Dejavu({"database_type":"mem"})
+    dejavu = Dejavu({"database_type": "mem"})
     try:
         dejavu.fingerprint_file(filename)
     except Exception as ex:
@@ -264,9 +268,7 @@ def process_segment(item, audios=None, calibration=None):
         values = []
 
         if not os.path.isfile(path):
-            log.error('[process_segment] file not found: {}'\
-                .format(short_path))
-            continue
+            download_file(path)
 
         try:
             for match in dejavu.recognize(recognizer, path, segment_size,
@@ -289,18 +291,18 @@ def process_segment(item, audios=None, calibration=None):
 
         except CouldntDecodeError as ex:
             log.error('[process_segment] {}'.format(ex))
-    
+
     try:
         response = client.put_schedule_results(
             item['schedule'],
             item['id'],
-            None, # TODO: send results again
+            None,  # TODO: send results again
             found=find_repetitions(results,
-                segments_needed=segments_needed,
-                calibration=calibration,
-            ),
+                                   segments_needed=segments_needed,
+                                   calibration=calibration,
+                                   ),
             missing_files=(12 - audios_counter) \
-                          if audios_counter < 12 else 0
+                if audios_counter < 12 else 0
         )
         log.info('[{}] API response: {}'.format(station, response))
     except ConnectionError as ex:
@@ -308,6 +310,7 @@ def process_segment(item, audios=None, calibration=None):
     except UserWarning as warn:
         log.warning(str(warn))
 
+
 def find_repetitions(results, segments_needed=2, calibration=None):
     found_counter = 0
     found_down_counter = 0
@@ -373,6 +376,7 @@ def find_repetitions(results, segments_needed=2, calibration=None):
 
     return found
 
+
 def iterate_audios(dt, station, calibration=None):
     """ Given a datetime object and an station,
     iterate a list of files that are between
@@ -398,10 +402,10 @@ def iterate_audios(dt, station, calibration=None):
         'select "filename", "timestamp" '
         'from "file" '
         'where "timestamp" between ? and ? '
-            'and "station" = ? '
+        'and "station" = ? '
         'order by "timestamp" asc'
-        ),   
-        (from_time, to_time, station, ),
+    ),
+        (from_time, to_time, station,),
     )
     files = [file for file in cursor]
     cursor.close()
@@ -429,7 +433,7 @@ def cloud_download(ad_key=None):
     out_file = os.path.join(AUDIOS_PATH, filename)
     url = '{}/{}'.format(cloud_base_url, ad['path'])
     response = requests.get(url)
-    
+
     if response.status_code == 200:
         hashes = response.headers['x-goog-hash']
         hashes = hashes.split(',')
@@ -440,15 +444,31 @@ def cloud_download(ad_key=None):
         with open(out_file, "wb") as fp:
             fp.write(response.content)
             tp = (out_file, md5sum,)
-            p = Popen(['ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_name', '-of', 'default=nokey=1:noprint_wrappers=1', out_file], stdin=PIPE, stdout=PIPE, stderr=PIPE)
+            p = Popen(['ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_name', '-of',
+                       'default=nokey=1:noprint_wrappers=1', out_file], stdin=PIPE, stdout=PIPE, stderr=PIPE)
             rc = p.returncode
             if rc != 'mp3\n':
                 subprocess.call(['mv', out_file, out_file + '.old'])
-                subprocess.call(['ffmpeg', '-hide_banner', '-loglevel', 'panic', '-i', out_file + '.old', '-f', 'mp3', out_file])
+                subprocess.call(
+                    ['ffmpeg', '-hide_banner', '-loglevel', 'panic', '-i', out_file + '.old', '-f', 'mp3', out_file])
                 subprocess.call(['rm', '-rf', out_file + '.old'])
             cloud_cache[ad_key] = tp
             return tp
 
+
+def download_file(file_path=None):
+    file_path_cloud = file_path.replace("/var/fourier/", "")
+    url = '{}/{}'.format(cloud_base_url, file_path_cloud)
+    response = requests.get(url)
+
+    if response.status_code == 200:
+        with open(file_path, "wb") as fp:
+            fp.write(response.content)
+        cursor = db.cursor()
+        cursor.execute('update "file" set uploaded = 0 where filename = ?', (file_path,), )
+        cursor.close()
+
+
 app = setup_endpoint(queue=queue)
 loop = IOLoop.current()
 loop.add_callback(feed_queue)
@@ -458,4 +478,4 @@ if __name__ == '__main__':
         log.info('Starting ondemand service')
         loop.start()
     except KeyboardInterrupt:
-        log.error('Process killed')
+        log.error('Process killed')