Source code for xon_db
import os
import re
import urllib.parse
import time
import logging
from collections import UserDict
from fnmatch import fnmatch
from xon_db.crc import crc_block
KEYPAIR_RE = re.compile(r'\\([^\\"]+)\\([^\\"]+)')
DB_BUCKETS = 8192
logger = logging.getLogger(__name__)
[docs]class XonoticDBException(Exception):
"""
Something went wrong with Xonotic DB
"""
pass
[docs]class XonoticDB(UserDict):
"""
A class for reading and writing xonotic databases. Provides dict-like interface
"""
def __init__(self, data: str, db_buckets=DB_BUCKETS, hashfunc=crc_block):
"""
:param data: database contents
:param db_buckets: number of buckets (for writing)
:param hashfunc: default hashfunc. Change with care
"""
self.db_buckets = db_buckets
self.hashfunc = hashfunc
super().__init__()
for i in data.splitlines()[1:]:
self.parse_line(i)
[docs] def parse_line(self, line: str):
"""
Parse a line from database
:param line: line to parse
"""
for i in KEYPAIR_RE.finditer(line):
key = i.group(1)
value = urllib.parse.unquote(i.group(2))
self[key] = value
[docs] @classmethod
def load(cls, file):
"""
Load a database from an open file
:param file: file
:return: XonoticDB instance
"""
return cls(file.read())
[docs] @classmethod
def load_path(cls, file_path):
"""
Load a database from the specified file path
:param file_path: file path
:return: XonoticDB instance
"""
with open(file_path, 'r') as f:
return cls.load(f)
[docs] @staticmethod
def get_backup_file_name(file_path):
"""
Get a file name for backup file
:param file_path: file path
:return: backup file name
"""
return file_path + '.%s.bak' % time.time()
[docs] def save(self, file_path):
"""
Write database to a file. If a file with the specified name exists it's backed up
:param file_path: file path
"""
if os.path.isfile(file_path):
with open(self.get_backup_file_name(file_path), 'w') as d:
with open(file_path, 'r') as o:
d.write(o.read())
elif os.path.exists(file_path):
raise XonoticDBException('%s exists and is not a file. Cannot write to it.', file_path)
lines = [''] * self.db_buckets
for key, value in self.items():
lines[self.hashfunc(key) % self.db_buckets] += r'\%s\%s' % (key, urllib.parse.quote(value))
with open(file_path, 'w') as f:
f.write('%d\n' % self.db_buckets)
for i in lines:
f.write(i + '\n')
[docs] def filter(self, key_pattern='*', is_regex=False):
"""
Filter database key by pattern
:param key_pattern: pattern (either glob or regex)
:param is_regex: should be True if the pattern is regex
:return: iterator of (key, value) pairs
"""
if is_regex:
if isinstance(key_pattern, str):
regex = re.compile(key_pattern)
else:
regex = key_pattern
else:
regex = None
for k, v in self.items():
if regex and regex.match(k) or (not regex) and fnmatch(k, key_pattern):
yield k, v
[docs] def remove_cts_record(self, map, position):
"""
Remove a CTS record
:param map:
:param position:
:return:
"""
def _key1(pos):
return '%s/cts100record/crypto_idfp%s' % (map, pos)
def _key2(pos):
return '%s/cts100record/time%s' % (map, pos)
for i in range(position, 100):
if _key1(i) in self:
del self[_key1(i)]
if _key2(i) in self:
del self[_key2(i)]
if _key1(i+1) in self:
self[_key1(i)] = self[_key1(i+1)]
if _key2(i+1) in self:
self[_key2(i)] = self[_key2(i+1)]
[docs] def remove_all_cts_records_by(self, crypto_idfp):
"""
Remove all CTS records from the specified player
:param crypto_idfp:
:return:
"""
regex = re.compile('(.+)/cts100record/crypto_idfp(\d+)')
to_remove = []
for k, v in self.filter(regex, is_regex=True):
if v == crypto_idfp:
match = regex.match(k)
to_remove.append((match.group(1), int(match.group(2))))
for i in to_remove:
self.remove_cts_record(*i)