Source code for finalfusion.embeddings

"""
Finalfusion Embeddings
"""
import heapq
from dataclasses import field, dataclass
from os import PathLike
from typing import Optional, Tuple, List, Union, Any, Iterator, Set

import numpy as np

from finalfusion.io import Chunk, Header, _read_chunk_header, ChunkIdentifier, \
    FinalfusionFormatError, _read_required_chunk_header
from finalfusion.metadata import Metadata
from finalfusion.norms import Norms
from finalfusion.storage import Storage, NdArray, QuantizedArray
from finalfusion.vocab import Vocab, SimpleVocab, FinalfusionBucketVocab, FastTextVocab, \
    ExplicitVocab


[docs]class Embeddings: # pylint: disable=too-many-instance-attributes """ Embeddings class. Embeddings always contain a :class:`~finalfusion.storage.storage.Storage` and :class:`~finalfusion.vocab.vocab.Vocab`. Optional chunks are :class:`~finalfusion.norms.Norms` corresponding to the embeddings of the in-vocab tokens and :class:`~finalfusion.metadata.Metadata`. Embeddings can be retrieved through three methods: 1. :meth:`Embeddings.embedding` allows to provide a default value and returns this value if no embedding could be found. 2. :meth:`Embeddings.__getitem__` retrieves an embedding for the query but raises an exception if it cannot retrieve an embedding. 3. :meth:`Embeddings.embedding_with_norm` requires a :class:`~finalfusion.norms.Norms` chunk and returns an embedding together with the corresponding L2 norm. Embeddings are composed of the 4 chunk types: 1. :class:`~finalfusion.storage.storage.Storage` *(required)*: * :class:`~finalfusion.storage.ndarray.NdArray` * :class:`~finalfusion.storage.ndarray.QuantizedArray` 2. :class:`~finalfusion.vocab.vocab.Vocab` *(required)*: * :class:`~finalfusion.vocab.simple_vocab.SimpleVocab` * :class:`~finalfusion.vocab.subword.FinalfusionBucketVocab` * :class:`~finalfusion.vocab.subword.FastTextVocab` * :class:`~finalfusion.vocab.subword.ExplicitVocab` 3. :class:`~finalfusion.metadata.Metadata` 4. :class:`~finalfusion.norms.Norms` Examples -------- >>> storage = NdArray(np.float32(np.random.rand(2, 10))) >>> vocab = SimpleVocab(["Some", "words"]) >>> metadata = Metadata({"Some": "value", "numerical": 0}) >>> norms = Norms(np.float32(np.random.rand(2))) >>> embeddings = Embeddings(storage=storage, vocab=vocab, metadata=metadata, norms=norms) >>> embeddings.vocab.words ['Some', 'words'] >>> np.allclose(embeddings["Some"], storage[0]) True >>> try: ... embeddings["oov"] ... except KeyError: ... True True >>> _, n = embeddings.embedding_with_norm("Some") >>> np.isclose(n, norms[0]) True >>> embeddings.metadata {'Some': 'value', 'numerical': 0} """
[docs] def __init__( # pylint: disable=too-many-arguments self, storage: Storage, vocab: Vocab, norms: Optional[Norms] = None, metadata: Optional[Metadata] = None, origin: str = "<memory>"): """ Initialize Embeddings. Initializes Embeddings with the given chunks. :Conditions: The following conditions need to hold if the respective chunks are passed: * Chunks need to have the expected type. * ``vocab.idx_bound == storage.shape[0]`` * ``len(vocab) == len(norms)`` * ``len(norms) == len(vocab) and len(norms) >= storage.shape[0]`` Parameters ---------- storage : Storage Embeddings Storage. vocab : Vocab Embeddings Vocabulary. norms : Norms, optional Embeddings Norms. metadata : Metadata, optional Embeddings Metadata. origin : str, optional Origin of the embeddings, e.g. file name Raises ------ AssertionError If any of the conditions don't hold. """ Embeddings._check_requirements(storage, vocab, norms, metadata) self._storage = storage self._vocab = vocab self._norms = norms self._metadata = metadata self._origin = origin
[docs] def __getitem__(self, item: str) -> np.ndarray: """ Returns an embedding. Parameters ---------- item : str The query item. Returns ------- embedding : numpy.ndarray The embedding. Raises ------ KeyError If no embedding could be retrieved. See Also -------- :func:`~Embeddings.embedding` :func:`~Embeddings.embedding_with_norm` """ # no need to check for none since Vocab raises KeyError if it can't produce indices idx = self._vocab[item] return self._embedding(idx)[0]
[docs] def embedding(self, word: str, out: Optional[np.ndarray] = None, default: Optional[np.ndarray] = None ) -> Optional[np.ndarray]: """ Embedding lookup. Looks up the embedding for the input word. If an `out` array is specified, the embedding is written into the array. If it is not possible to retrieve an embedding for the input word, the `default` value is returned. This defaults to `None`. An embedding can not be retrieved if the vocabulary cannot provide an index for `word`. This method never fails. If you do not provide a default value, check the return value for None. ``out`` is left untouched if no embedding can be found and ``default`` is None. Parameters ---------- word : str The query word. out : numpy.ndarray, optional Optional output array to write the embedding into. default: numpy.ndarray, optional Optional default value to return if no embedding can be retrieved. Defaults to None. Returns ------- embedding : numpy.ndarray, optional The retrieved embedding or the default value. Examples -------- >>> matrix = np.float32(np.random.rand(2, 10)) >>> storage = NdArray(matrix) >>> vocab = SimpleVocab(["Some", "words"]) >>> embeddings = Embeddings(storage=storage, vocab=vocab) >>> np.allclose(embeddings.embedding("Some"), matrix[0]) True >>> # default value is None >>> embeddings.embedding("oov") is None True >>> # It's possible to specify a default value >>> default = embeddings.embedding("oov", default=storage[0]) >>> np.allclose(default, storage[0]) True >>> # Embeddings can be written to an output buffer. >>> out = np.zeros(10, dtype=np.float32) >>> out2 = embeddings.embedding("Some", out=out) >>> out is out2 True >>> np.allclose(out, matrix[0]) True See Also -------- :func:`~Embeddings.embedding_with_norm` :func:`~Embeddings.__getitem__` """ idx = self._vocab.idx(word) if idx is None: if out is not None and default is not None: out[:] = default return out return default return self._embedding(idx, out)[0]
[docs] def embedding_with_norm(self, word: str, out: Optional[np.ndarray] = None, default: Optional[Tuple[np.ndarray, float]] = None ) -> Optional[Tuple[np.ndarray, float]]: """ Embedding lookup with norm. Looks up the embedding for the input word together with its norm. If an `out` array is specified, the embedding is written into the array. If it is not possible to retrieve an embedding for the input word, the `default` value is returned. This defaults to `None`. An embedding can not be retrieved if the vocabulary cannot provide an index for `word`. This method raises a TypeError if norms are not set. Parameters ---------- word : str The query word. out : numpy.ndarray, optional Optional output array to write the embedding into. default: Tuple[numpy.ndarray, float], optional Optional default value to return if no embedding can be retrieved. Defaults to None. Returns ------- (embedding, norm) : EmbeddingWithNorm, optional Tuple with the retrieved embedding or the default value at the first index and the norm at the second index. See Also -------- :func:`~Embeddings.embedding` :func:`~Embeddings.__getitem__` """ if self._norms is None: raise TypeError("embeddings don't contain norms chunk") idx = self._vocab.idx(word) if idx is None: if out is not None and default is not None: out[:] = default[0] return out, default[1] return default # declare the norm as Any, self._embedding returns Optional[float], but above its # ensured norms are present, the norm is guaranteed to be float, not Optional[float] val = self._embedding(idx, out) # type: Tuple[np.ndarray, Any] return val
@property def dims(self) -> int: """ Get the embdeding dimensionality. Returns ------- dims : int Embedding dimensionality """ return self.storage.shape[1] @property def n_words(self) -> int: """ Get the number of known words. Returns ------- n_words : int Number of known words """ return len(self.vocab) @property def storage(self) -> Storage: """ Get the :class:`~finalfusion.storage.storage.Storage`. Returns ------- storage : Storage The embeddings storage. """ return self._storage @property def vocab(self) -> Vocab: """ The :class:`~finalfusion.vocab.vocab.Vocab`. Returns ------- vocab : Vocab The vocabulary """ return self._vocab @property def norms(self) -> Optional[Norms]: """ The :class:`~finalfusion.vocab.vocab.Norms`. :Getter: Returns None or the Norms. :Setter: Set the Norms. Returns ------- norms : Norms, optional The Norms or None. Raises ------ AssertionError if ``embeddings.storage.shape[0] < len(embeddings.norms)`` or ``len(embeddings.norms) != len(embeddings.vocab)`` TypeError If ``norms`` is neither Norms nor None. """ return self._norms @norms.setter def norms(self, norms: Optional[Norms]): if norms is None: self._norms = None else: Embeddings._norms_compat(self.storage, self.vocab, norms) self._norms = norms @property def metadata(self) -> Optional[Metadata]: """ The :class:`~finalfusion.vocab.vocab.Metadata`. :Getter: Returns None or the Metadata. :Setter: Set the Metadata. Returns ------- metadata : Metadata, optional The Metadata or None. Raises ------ TypeError If ``metadata`` is neither Metadata nor None. """ return self._metadata @metadata.setter def metadata(self, metadata: Optional[Metadata]): if metadata is None: self._metadata = None elif isinstance(metadata, Metadata): self._metadata = metadata else: raise TypeError("Expected 'None' or 'Metadata'.") @property def origin(self) -> str: """ The origin of the embeddings. Returns ------- origin : str Origin of the embeddings, e.g. file name """ return self._origin
[docs] def chunks(self) -> List[Chunk]: """ Get the Embeddings Chunks as a list. The Chunks are ordered in the expected serialization order: 1. Metadata (optional) 2. Vocabulary 3. Storage 4. Norms (optional) Returns ------- chunks : List[Chunk] List of embeddings chunks. """ chunks = [] # type: List[Chunk] if self.metadata is not None: chunks.append(self.metadata) chunks.append(self.vocab) chunks.append(self.storage) if self.norms is not None: chunks.append(self.norms) return chunks
[docs] def write(self, file: Union[str, bytes, int, PathLike]): """ Write the Embeddings to the given file. Writes the Embeddings to a finalfusion file at the given file. Parameters ---------- file : str, bytes, int, PathLike Path of the output file. """ with open(file, 'wb') as outf: chunks = self.chunks() header = Header([chunk.chunk_identifier() for chunk in chunks]) header.write_chunk(outf) for chunk in chunks: chunk.write_chunk(outf)
[docs] def bucket_to_explicit(self) -> 'Embeddings': """ Bucket to explicit Embeddings conversion. Multiple embeddings can still map to the same bucket, but all buckets that are not indexed by in-vocabulary n-grams are eliminated. This can have a big impact on the size of the embedding matrix. Metadata is **not** copied to the new embeddings since it doesn't reflect the changes. You can manually set the metadata and update the values accordingly. Returns ------- embeddings : Embeddings Embeddings with an ExplicitVocab instead of a hash-based vocabulary. Raises ------ TypeError If the current vocabulary is not a hash-based vocabulary (FinalfusionBucketVocab or FastTextVocab) """ bucket_vocabs = (FastTextVocab, FinalfusionBucketVocab) if not isinstance(self.vocab, bucket_vocabs): raise TypeError( "Only bucketed embeddings can be converted to explicit.") vocab = self.vocab.to_explicit() storage = np.zeros((vocab.upper_bound, self._storage.shape[1]), dtype=np.float32) storage[:len(vocab)] = self._storage[:len(vocab)] for ngram in vocab.subword_indexer: storage[len(vocab) + vocab.subword_indexer[ngram]] = self._storage[ len(vocab) + self.vocab.subword_indexer(ngram)] return Embeddings(vocab=vocab, storage=NdArray(storage), norms=self.norms)
[docs] def analogy( # pylint: disable=too-many-arguments self, word1: str, word2: str, word3: str, k: int = 1, skip: Set[str] = None) -> Optional[List['SimilarityResult']]: """ Perform an analogy query. This method returns words that are close in vector space the analogy query `word1` is to `word2` as `word3` is to `?`. More concretely, it searches embeddings that are similar to: ``embedding(word2) - embedding(word1) + embedding(word3)`` Words specified in ``skip`` are not considered as answers. If ``skip`` is None, the query words ``word1``, ``word2`` and ``word3`` are excluded. At most, ``k`` results are returned. ``None`` is returned when no embedding could be computed for any of the tokens. Parameters ---------- word1 : str Word1 is to... word2 : str word2 like... word3 : str word3 is to the return value skip : Set[str] Set of strings which should not be considered as answers. Defaults to ``None`` which excludes the query strings. To allow the query strings as answers, pass an empty set. k : int Number of answers to return, defaults to 1. Returns ------- answers : List[SimilarityResult] List of answers. """ embed_a = self.embedding(word1) embed_b = self.embedding(word2) embed_c = self.embedding(word3) if embed_a is None or embed_b is None or embed_c is None: return None diff = embed_b - embed_a embed_d = embed_c + diff embed_d /= np.linalg.norm(embed_d) return self._similarity( embed_d, k, {word1, word2, word3} if skip is None else skip)
[docs] def word_similarity(self, query: str, k: int = 10) -> Optional[List['SimilarityResult']]: """ Retrieves the nearest neighbors of the query string. The similarity between the embedding of the query and other embeddings is defined by the dot product of the embeddings. If the vectors are unit vectors, this is the cosine similarity. At most, ``k`` results are returned. Parameters ---------- query : str The query string k : int The number of neighbors to return, defaults to 10. Returns ------- neighbours : List[Tuple[str, float], optional List of tuples with neighbour and similarity measure. None if no embedding can be found for ``query``. """ embed = self.embedding(query) if embed is None: return None return self._similarity(embed, k, {query})
[docs] def embedding_similarity(self, query: np.ndarray, k: int = 10, skip: Optional[Set[str]] = None ) -> Optional[List['SimilarityResult']]: """ Retrieves the nearest neighbors of the query embedding. The similarity between the query embedding and other embeddings is defined by the dot product of the embeddings. If the vectors are unit vectors, this is the cosine similarity. At most, ``k`` results are returned. Parameters ---------- query : str The query array. k : int The number of neighbors to return, defaults to 10. skip : Set[str], optional Set of strings that should not be considered as neighbours. Returns ------- neighbours : List[Tuple[str, float], optional List of tuples with neighbour and similarity measure. None if no embedding can be found for ``query``. """ return self._similarity(query, k, set() if skip is None else skip)
def __contains__(self, item): return item in self._vocab def __iter__(self) -> Union[Iterator[Tuple[str, np.ndarray]], Iterator[ Tuple[str, np.ndarray, float]]]: if self._norms is not None: return zip(self._vocab, self._storage, self._norms) return zip(self._vocab, self._storage) def __repr__(self): return f"{type(self).__name__}(\n" \ f"\tstorage_type={type(self.storage).__name__}\n" \ f"\tvocab_type={type(self.vocab).__name__}\n" \ f"\thas_metadata={self.metadata is not None}\n" \ f"\thas_norms={self.norms is not None}\n" \ f"\tn_words={self.n_words},\n" \ f"\tdims={self.dims},\n" \ f"\torigin='{self.origin}',\n" \ f")" def _similarity(self, query: np.ndarray, k: int, skips: Set[str]) -> List['SimilarityResult']: words = self.storage[:len(self.vocab)] # type: np.ndarray sims = words.dot(query) skip_indices = set(skip for skip in (self.vocab.word_index.get(skip) for skip in skips) if skip is not None) partition = sims.argpartition(-k - len(skip_indices))[-k - len(skip_indices):] heap = [] # type: List[SimilarityResult] for idx in partition: if idx not in skip_indices: heapq.heappush( heap, SimilarityResult(self.vocab.words[idx], sims[idx])) return heapq.nlargest(k, heap) def _embedding(self, idx: Union[int, List[int]], out: Optional[np.ndarray] = None ) -> Tuple[np.ndarray, Optional[float]]: res = self._storage[idx] # type: np.ndarray if res.ndim == 1: if out is not None: out[:] = res else: out = res if self._norms is not None: norm = self._norms[idx] # type: Optional[float] else: norm = None else: out = np.add.reduce(res, 0, out=out, keepdims=False) norm = np.linalg.norm(out) out /= norm return out, norm @staticmethod def _check_requirements(storage: Storage, vocab: Vocab, norms: Optional[Norms], metadata: Optional[Metadata]): assert isinstance(storage, Storage),\ "storage is required to be a Storage" assert isinstance(vocab, Vocab), "vocab is required to be a Vocab" assert storage.shape[0] == vocab.upper_bound,\ "Number of embeddings needs to be equal to vocab's idx_bound" if norms is not None: Embeddings._norms_compat(storage, vocab, norms) assert metadata is None or isinstance(metadata, Metadata),\ "metadata is required to be Metadata" @staticmethod def _norms_compat(storage: Storage, vocab: Vocab, norms: Norms): assert isinstance(norms, Norms), "norms are required to be Norms" assert storage.shape[0] >= len(norms),\ "Number of embeddings needs to be greater than or equal to number of norms." assert len(vocab) == len(norms),\ "Vocab length needs to be equal to number of norms."
[docs]def load_finalfusion(file: Union[str, bytes, int, PathLike], mmap: bool = False) -> Embeddings: """ Read embeddings from a file in finalfusion format. Parameters ---------- file : str, bytes, int, PathLike Path to a file with embeddings in finalfusoin format. mmap : bool Toggles memory mapping the storage buffer. Returns ------- embeddings : Embeddings The embeddings from the input file. """ with open(file, 'rb') as inf: _ = Header.read_chunk(inf) chunk_id, _ = _read_required_chunk_header(inf) norms = None metadata = None if chunk_id == ChunkIdentifier.Metadata: metadata = Metadata.read_chunk(inf) chunk_id, _ = _read_required_chunk_header(inf) if chunk_id == ChunkIdentifier.SimpleVocab: vocab = SimpleVocab.read_chunk(inf) # type: Vocab elif chunk_id == ChunkIdentifier.BucketSubwordVocab: vocab = FinalfusionBucketVocab.read_chunk(inf) elif chunk_id == ChunkIdentifier.FastTextSubwordVocab: vocab = FastTextVocab.read_chunk(inf) elif chunk_id == ChunkIdentifier.ExplicitSubwordVocab: vocab = ExplicitVocab.read_chunk(inf) else: raise FinalfusionFormatError( f'Expected vocab chunk, not {str(chunk_id)}') chunk_id, _ = _read_required_chunk_header(inf) if chunk_id == ChunkIdentifier.NdArray: storage = NdArray.load(inf, mmap) # type: Storage elif chunk_id == ChunkIdentifier.QuantizedArray: storage = QuantizedArray.load(inf, mmap) else: raise FinalfusionFormatError( f'Expected storage chunk, not {str(chunk_id)}') maybe_chunk_id = _read_chunk_header(inf) if maybe_chunk_id is not None: if maybe_chunk_id[0] == ChunkIdentifier.NdNorms: norms = Norms.read_chunk(inf) else: raise FinalfusionFormatError( f'Expected norms chunk, not {str(chunk_id)}') return Embeddings(storage, vocab, norms, metadata, inf.name)
[docs]@dataclass(order=True) class SimilarityResult: """ Container for a Similarity result. The word can be accessed through ``result.word``, the similarity through ``result.similarity``. """ word: str = field(compare=False) similarity: float
__all__ = ['Embeddings', 'SimilarityResult', 'load_finalfusion']