database_sql.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. from __future__ import absolute_import
  2. from itertools import izip_longest
  3. import Queue
  4. import MySQLdb as mysql
  5. from MySQLdb.cursors import DictCursor
  6. from dejavu.database import Database
  7. class SQLDatabase(Database):
  8. """
  9. Queries:
  10. 1) Find duplicates (shouldn't be any, though):
  11. select `hash`, `song_id`, `offset`, count(*) cnt
  12. from fingerprints
  13. group by `hash`, `song_id`, `offset`
  14. having cnt > 1
  15. order by cnt asc;
  16. 2) Get number of hashes by song:
  17. select song_id, song_name, count(song_id) as num
  18. from fingerprints
  19. natural join songs
  20. group by song_id
  21. order by count(song_id) desc;
  22. 3) get hashes with highest number of collisions
  23. select
  24. hash,
  25. count(distinct song_id) as n
  26. from fingerprints
  27. group by `hash`
  28. order by n DESC;
  29. => 26 different songs with same fingerprint (392 times):
  30. select songs.song_name, fingerprints.offset
  31. from fingerprints natural join songs
  32. where fingerprints.hash = "08d3c833b71c60a7b620322ac0c0aba7bf5a3e73";
  33. """
  34. type = "mysql"
  35. # tables
  36. FINGERPRINTS_TABLENAME = "fingerprints"
  37. SONGS_TABLENAME = "ads"
  38. # fields
  39. FIELD_FINGERPRINTED = "fingerprinted"
  40. # creates
  41. CREATE_FINGERPRINTS_TABLE = """
  42. CREATE TABLE IF NOT EXISTS `%s` (
  43. `%s` binary(10) not null,
  44. `%s` int unsigned not null,
  45. `%s` int unsigned not null,
  46. INDEX (%s),
  47. UNIQUE KEY `unique_constraint` (%s, %s, %s),
  48. FOREIGN KEY (%s) REFERENCES %s(%s) ON DELETE CASCADE
  49. ) ENGINE=INNODB;""" % (
  50. FINGERPRINTS_TABLENAME, Database.FIELD_HASH,
  51. Database.FIELD_SONG_ID, Database.FIELD_OFFSET, Database.FIELD_HASH,
  52. Database.FIELD_SONG_ID, Database.FIELD_OFFSET, Database.FIELD_HASH,
  53. Database.FIELD_SONG_ID, SONGS_TABLENAME, Database.FIELD_SONG_ID
  54. )
  55. CREATE_SONGS_TABLE = """
  56. CREATE TABLE IF NOT EXISTS `%s` (
  57. `%s` int unsigned not null auto_increment,
  58. `%s` varchar(250) not null,
  59. `%s` bigint default 0,
  60. `%s` tinyint default 0,
  61. `%s` binary(20) not null,
  62. PRIMARY KEY (`%s`),
  63. UNIQUE KEY `%s` (`%s`)
  64. ) ENGINE=INNODB;""" % (
  65. SONGS_TABLENAME, Database.FIELD_SONG_ID, Database.FIELD_SONGNAME, Database.FIELD_LENGTH, FIELD_FINGERPRINTED,
  66. Database.FIELD_FILE_SHA1,
  67. Database.FIELD_SONG_ID, Database.FIELD_SONG_ID, Database.FIELD_SONG_ID,
  68. )
  69. # inserts (ignores duplicates)
  70. INSERT_FINGERPRINT = """
  71. INSERT IGNORE INTO %s (%s, %s, %s) values
  72. (UNHEX(%%s), %%s, %%s);
  73. """ % (FINGERPRINTS_TABLENAME, Database.FIELD_HASH, Database.FIELD_SONG_ID, Database.FIELD_OFFSET)
  74. INSERT_SONG = "INSERT INTO %s (%s, %s, %s) values (%%s, %%s, UNHEX(%%s));" % (
  75. SONGS_TABLENAME, Database.FIELD_SONGNAME, Database.FIELD_LENGTH, Database.FIELD_FILE_SHA1)
  76. # selects
  77. SELECT = """
  78. SELECT %s, %s FROM %s WHERE %s = UNHEX(%%s);
  79. """ % (Database.FIELD_SONG_ID, Database.FIELD_OFFSET, FINGERPRINTS_TABLENAME, Database.FIELD_HASH)
  80. SELECT_MULTIPLE = """
  81. SELECT HEX(%s), %s, %s FROM %s WHERE %s IN (%%s);
  82. """ % (Database.FIELD_HASH, Database.FIELD_SONG_ID, Database.FIELD_OFFSET,
  83. FINGERPRINTS_TABLENAME, Database.FIELD_HASH)
  84. SELECT_MULTIPLE_FILTER = """
  85. SELECT HEX(%s), %s, %s FROM %s WHERE %s IN (%%s) AND id IN (%%s);
  86. """ % (Database.FIELD_HASH, Database.FIELD_SONG_ID, Database.FIELD_OFFSET,
  87. FINGERPRINTS_TABLENAME, Database.FIELD_HASH)
  88. SELECT_ALL = """
  89. SELECT %s, %s FROM %s;
  90. """ % (Database.FIELD_SONG_ID, Database.FIELD_OFFSET, FINGERPRINTS_TABLENAME)
  91. SELECT_SONG = """
  92. SELECT %s, HEX(%s) as %s FROM %s WHERE %s = %%s;
  93. """ % (Database.FIELD_SONGNAME, Database.FIELD_FILE_SHA1, Database.FIELD_FILE_SHA1, SONGS_TABLENAME, Database.FIELD_SONG_ID)
  94. SELECT_NUM_FINGERPRINTS = """
  95. SELECT COUNT(*) as n FROM %s
  96. """ % (FINGERPRINTS_TABLENAME)
  97. SELECT_UNIQUE_SONG_IDS = """
  98. SELECT COUNT(DISTINCT %s) as n FROM %s WHERE %s = 1;
  99. """ % (Database.FIELD_SONG_ID, SONGS_TABLENAME, FIELD_FINGERPRINTED)
  100. SELECT_SONGS = """
  101. SELECT %s, %s, HEX(%s) as %s FROM %s WHERE %s = 1;
  102. """ % (Database.FIELD_SONG_ID, Database.FIELD_SONGNAME, Database.FIELD_FILE_SHA1, Database.FIELD_FILE_SHA1,
  103. SONGS_TABLENAME, FIELD_FINGERPRINTED)
  104. # drops
  105. DROP_FINGERPRINTS = "DROP TABLE IF EXISTS %s;" % FINGERPRINTS_TABLENAME
  106. DROP_SONGS = "DROP TABLE IF EXISTS %s;" % SONGS_TABLENAME
  107. # update
  108. UPDATE_SONG_FINGERPRINTED = """
  109. UPDATE %s SET %s = 1 WHERE %s = %%s
  110. """ % (SONGS_TABLENAME, FIELD_FINGERPRINTED, Database.FIELD_SONG_ID)
  111. # delete
  112. DELETE_UNFINGERPRINTED = """
  113. DELETE FROM %s WHERE %s = 0;
  114. """ % (SONGS_TABLENAME, FIELD_FINGERPRINTED)
  115. def __init__(self, **options):
  116. super(SQLDatabase, self).__init__()
  117. self.cursor = cursor_factory(**options)
  118. self._options = options
  119. def after_fork(self):
  120. # Clear the cursor cache, we don't want any stale connections from
  121. # the previous process.
  122. Cursor.clear_cache()
  123. def setup(self):
  124. """
  125. Creates any non-existing tables required for dejavu to function.
  126. This also removes all songs that have been added but have no
  127. fingerprints associated with them.
  128. """
  129. with self.cursor() as cur:
  130. cur.execute(self.CREATE_SONGS_TABLE)
  131. cur.execute(self.CREATE_FINGERPRINTS_TABLE)
  132. cur.execute(self.DELETE_UNFINGERPRINTED)
  133. def empty(self):
  134. """
  135. Drops tables created by dejavu and then creates them again
  136. by calling `SQLDatabase.setup`.
  137. .. warning:
  138. This will result in a loss of data
  139. """
  140. with self.cursor() as cur:
  141. cur.execute(self.DROP_FINGERPRINTS)
  142. cur.execute(self.DROP_SONGS)
  143. self.setup()
  144. def delete_unfingerprinted_songs(self):
  145. """
  146. Removes all songs that have no fingerprints associated with them.
  147. """
  148. with self.cursor() as cur:
  149. cur.execute(self.DELETE_UNFINGERPRINTED)
  150. def get_num_songs(self):
  151. """
  152. Returns number of songs the database has fingerprinted.
  153. """
  154. with self.cursor() as cur:
  155. cur.execute(self.SELECT_UNIQUE_SONG_IDS)
  156. for count, in cur:
  157. return count
  158. return 0
  159. def get_num_fingerprints(self):
  160. """
  161. Returns number of fingerprints the database has fingerprinted.
  162. """
  163. with self.cursor() as cur:
  164. cur.execute(self.SELECT_NUM_FINGERPRINTS)
  165. for count, in cur:
  166. return count
  167. return 0
  168. def set_song_fingerprinted(self, sid):
  169. """
  170. Set the fingerprinted flag to TRUE (1) once a song has been completely
  171. fingerprinted in the database.
  172. """
  173. with self.cursor() as cur:
  174. cur.execute(self.UPDATE_SONG_FINGERPRINTED, (sid,))
  175. def get_songs(self):
  176. """
  177. Return songs that have the fingerprinted flag set TRUE (1).
  178. """
  179. with self.cursor(cursor_type=DictCursor) as cur:
  180. cur.execute(self.SELECT_SONGS)
  181. for row in cur:
  182. yield row
  183. def get_song_by_id(self, sid):
  184. """
  185. Returns song by its ID.
  186. """
  187. with self.cursor(cursor_type=DictCursor) as cur:
  188. cur.execute(self.SELECT_SONG, (sid,))
  189. return cur.fetchone()
  190. def insert(self, hash, sid, offset):
  191. """
  192. Insert a (sha1, song_id, offset) row into database.
  193. """
  194. with self.cursor() as cur:
  195. cur.execute(self.INSERT_FINGERPRINT, (hash, sid, offset))
  196. def insert_song(self, songname, length, file_hash):
  197. """
  198. Inserts song in the database and returns the ID of the inserted record.
  199. """
  200. with self.cursor() as cur:
  201. cur.execute(self.INSERT_SONG, (songname, length, file_hash))
  202. return cur.lastrowid
  203. def query(self, hash):
  204. """
  205. Return all tuples associated with hash.
  206. If hash is None, returns all entries in the
  207. database (be careful with that one!).
  208. """
  209. # select all if no key
  210. query = self.SELECT_ALL if hash is None else self.SELECT
  211. with self.cursor() as cur:
  212. cur.execute(query)
  213. for sid, offset in cur:
  214. yield (sid, offset)
  215. def get_iterable_kv_pairs(self):
  216. """
  217. Returns all tuples in database.
  218. """
  219. return self.query(None)
  220. def insert_hashes(self, sid, hashes):
  221. """
  222. Insert series of hash => song_id, offset
  223. values into the database.
  224. """
  225. values = []
  226. for hash, offset in hashes:
  227. values.append((hash, sid, offset))
  228. with self.cursor() as cur:
  229. for split_values in grouper(values, 1000):
  230. cur.executemany(self.INSERT_FINGERPRINT, split_values)
  231. def return_matches(self, hashes, ads_filter=None):
  232. """
  233. Return the (song_id, offset_diff) tuples associated with
  234. a list of (sha1, sample_offset) values.
  235. """
  236. # Create a dictionary of hash => offset pairs for later lookups
  237. mapper = {}
  238. for hash, offset in hashes:
  239. mapper[hash.upper()] = offset
  240. # Get an iteratable of all the hashes we need
  241. values = mapper.keys()
  242. with self.cursor() as cur:
  243. for split_values in grouper(values, 1000):
  244. # Create our IN part of the query
  245. if ads_filter is None:
  246. query = self.SELECT_MULTIPLE
  247. query = query % ', '.join(['UNHEX(%s)'] * len(split_values))
  248. else:
  249. query = self.SELECT_MULTIPLE_FILTER
  250. query = query % (', '.join(['UNHEX(%s)'] * len(split_values)), ",".join(ads_filter))
  251. #if ads_filter:
  252. #query = query + " and id in (" + ",".join(ads_filter) + ")"
  253. cur.execute(query, split_values)
  254. for hash, sid, offset in cur:
  255. # (sid, db_offset - song_sampled_offset)
  256. yield (sid, offset - mapper[hash], offset)
  257. def __getstate__(self):
  258. return (self._options,)
  259. def __setstate__(self, state):
  260. self._options, = state
  261. self.cursor = cursor_factory(**self._options)
  262. def grouper(iterable, n, fillvalue=None):
  263. args = [iter(iterable)] * n
  264. return (filter(None, values) for values
  265. in izip_longest(fillvalue=fillvalue, *args))
  266. def cursor_factory(**factory_options):
  267. def cursor(**options):
  268. options.update(factory_options)
  269. return Cursor(**options)
  270. return cursor
  271. class Cursor(object):
  272. """
  273. Establishes a connection to the database and returns an open cursor.
  274. ```python
  275. # Use as context manager
  276. with Cursor() as cur:
  277. cur.execute(query)
  278. ```
  279. """
  280. _cache = Queue.Queue(maxsize=5)
  281. def __init__(self, cursor_type=mysql.cursors.Cursor, **options):
  282. super(Cursor, self).__init__()
  283. try:
  284. conn = self._cache.get_nowait()
  285. except Queue.Empty:
  286. conn = mysql.connect(**options)
  287. else:
  288. # Ping the connection before using it from the cache.
  289. conn.ping(True)
  290. self.conn = conn
  291. self.conn.autocommit(False)
  292. self.cursor_type = cursor_type
  293. @classmethod
  294. def clear_cache(cls):
  295. cls._cache = Queue.Queue(maxsize=5)
  296. def __enter__(self):
  297. self.cursor = self.conn.cursor(self.cursor_type)
  298. return self.cursor
  299. def __exit__(self, extype, exvalue, traceback):
  300. # if we had a MySQL related error we try to rollback the cursor.
  301. if extype is mysql.MySQLError:
  302. self.cursor.rollback()
  303. self.cursor.close()
  304. self.conn.commit()
  305. # Put it back on the queue
  306. try:
  307. self._cache.put_nowait(self.conn)
  308. except Queue.Full:
  309. self.conn.close()