"""
Fasttext IO compat module.
"""
import sys
from os import PathLike
from typing import Union, BinaryIO, cast, List
import numpy as np
from finalfusion._util import _normalize_matrix
from finalfusion.embeddings import Embeddings
from finalfusion.io import _read_required_binary, _write_binary, _serialize_array_as_le
from finalfusion.metadata import Metadata
from finalfusion.storage import NdArray
from finalfusion.subword import FastTextIndexer
from finalfusion.vocab import FastTextVocab, Vocab, SimpleVocab
_FT_MAGIC = 793_712_314
[docs]def load_fasttext(file: Union[str, bytes, int, PathLike],
lossy: bool = False) -> Embeddings:
"""
Read embeddings from a file in fastText format.
The returned embeddings have a FastTextVocab, NdArray storage and a Norms chunk.
Loading embeddings with this method will precompute embeddings for each word by averaging all
of its subword embeddings together with the distinct word vector. Additionally, all precomputed
vectors are l2-normalized and the corresponding norms are stored in the Norms. The subword
embeddings are **not** l2-normalized.
Parameters
----------
file : str, bytes, int, PathLike
Path to a file with embeddings in word2vec binary format.
lossy : bool
If set to true, malformed UTF8 sequences in words will be replaced with the `U+FFFD`
REPLACEMENT character.
Returns
-------
embeddings : Embeddings
The embeddings from the input file.
"""
with open(file, 'rb') as inf:
_read_ft_header(inf)
metadata = _read_ft_cfg(inf)
vocab = _read_ft_vocab(inf, metadata['buckets'], metadata['min_n'],
metadata['max_n'], lossy)
storage = _read_ft_storage(inf, vocab)
norms = _normalize_matrix(storage[:len(vocab)])
return Embeddings(storage=storage,
vocab=vocab,
norms=norms,
metadata=metadata,
origin=inf.name)
[docs]def write_fasttext(file: Union[str, bytes, int, PathLike], embeds: Embeddings):
"""
Write embeddings in fastText format.
Only embeddings with fastText vocabulary can be written to fastText format.
fastText models require values for all config keys, some of these can be inferred from
finalfusion models others are assigned some default values:
* dims: inferred from model
* window_size: 0
* min_count: 0
* ns: 0
* word_ngrams: 1
* loss: HierarchicalSoftmax
* model: CBOW
* buckets: inferred from model
* min_n: inferred from model
* max_n: inferred from model
* lr_update_rate: 0
* sampling_threshold: 0
Some information from original fastText models gets lost e.g.:
* word frequencies
* n_tokens
Embeddings are un-normalized before serialization: if norms are present, each embedding is
scaled by the associated norm. Additionally, the original state of the embedding matrix is
restored, precomputation and l2-normalization of word embeddings is undone.
Parameters
----------
file : str, bytes, int, PathLike
Output file
embeds : Embeddings
Embeddings to write
"""
with open(file, 'wb') as outf:
vocab = embeds.vocab
if not isinstance(vocab, FastTextVocab):
raise ValueError(
f'Expected FastTextVocab, not: {type(embeds.vocab).__name__}')
_write_binary(outf, "<ii", _FT_MAGIC, 12)
_write_ft_cfg(outf, embeds.dims, vocab.subword_indexer.n_buckets,
vocab.min_n, vocab.max_n)
_write_ft_vocab(outf, embeds.vocab)
_write_binary(outf, "<?QQ", 0, *embeds.storage.shape)
if isinstance(embeds.vocab, SimpleVocab):
_write_ft_storage_simple(outf, embeds)
else:
_write_ft_storage_subwords(outf, embeds)
_serialize_array_as_le(outf, embeds.storage)
def _read_ft_header(file: BinaryIO):
"""
Helper method to verify version and magic.
"""
magic, version = _read_required_binary(file, "<ii")
if magic != _FT_MAGIC:
raise ValueError(f"Magic should be 793_712_314, not: {magic}")
if version != 12:
raise ValueError(f"Expected version 12, not: {version}")
def _read_ft_cfg(file: BinaryIO) -> Metadata:
"""
Constructs metadata from fastText config.
"""
cfg = list(_read_required_binary(file, "<12id"))
losses = ['HierarchicalSoftmax', 'NegativeSampling', 'Softmax']
cfg[6] = losses[cfg[6] - 1]
models = ['CBOW', 'SkipGram', 'Supervised']
cfg[7] = models[cfg[7] - 1]
return Metadata(dict(zip(_FT_REQUIRED_CFG_KEYS, cfg)))
def _read_ft_vocab(file: BinaryIO, buckets: int, min_n: int, max_n: int,
lossy: bool) -> Union[FastTextVocab, SimpleVocab]:
"""
Helper method to read a vocab from a fastText file
Returns a FastTextVocab.
"""
# discard n_words
vocab_size, _n_words, n_labels = _read_required_binary(file, "<iii")
if n_labels:
raise NotImplementedError(
"fastText prediction models are not supported")
# discard n_tokens
_read_required_binary(file, "<q")
prune_idx_size = _read_required_binary(file, "<q")[0]
if prune_idx_size >= 0:
raise NotImplementedError("Pruned vocabs are not supported")
words = [_read_binary_word(file, lossy) for _ in range(vocab_size)]
indexer = FastTextIndexer(buckets, min_n, max_n)
return FastTextVocab(words, indexer)
def _read_binary_word(file: BinaryIO, lossy: bool) -> str:
"""
Helper method to read null-terminated binary strings.
"""
word = bytearray()
while True:
byte = file.read(1)
if byte == b'\x00':
break
if byte == b'':
raise EOFError
word.extend(byte)
# discard frequency
_ = _read_required_binary(file, "<q")
entry_type = _read_required_binary(file, "b")[0]
if entry_type != 0:
raise ValueError(f'Non word entry: {word}')
return word.decode('utf8', errors='replace' if lossy else 'strict')
def _read_ft_storage(file: BinaryIO, vocab: Vocab) -> NdArray:
"""
Helper method to read fastText storage.
If vocab is a SimpleVocab, the matrix is read and returned as is.
If vocab is a FastTextVocab, the word representations are precomputed based
on the vocab.
"""
quantized = _read_required_binary(file, "?")[0]
if quantized:
raise NotImplementedError(
"Quantized storage is not supported for fastText models")
rows, cols = _read_required_binary(file, "<qq")
matrix = np.fromfile(file=file, count=rows * cols,
dtype=np.float32).reshape((rows, cols))
if sys.byteorder == 'big':
matrix.byteswap(inplace=True)
if isinstance(vocab, FastTextVocab):
_precompute_word_vecs(vocab, matrix)
return NdArray(matrix)
def _precompute_word_vecs(vocab: FastTextVocab, matrix: np.ndarray):
"""
Helper method to precompute word vectors.
Averages the distinct word representation and the corresponding ngram
embeddings.
"""
for i, word in enumerate(vocab):
indices = [i]
if isinstance(vocab, FastTextVocab):
subword_indices = cast(
List[int], vocab.subword_indices(word, with_ngrams=False))
indices += subword_indices
matrix[i] = matrix[indices].mean(0, keepdims=False)
def _write_ft_cfg(file: BinaryIO, dims: int, n_buckets: int, min_n: int,
max_n: int):
"""
Helper method to write fastText config.
The following values are used:
* dims: passed as arg
* window_size: 0
* min_count: 0
* ns: 0
* word_ngrams: 1
* loss: HierarchicalSoftmax
* model: CBOW
* buckets: passed as arg
* min_n: passed as arg
* max_n: passed as arg
* lr_update_rate: 0
* sampling_threshold: 0
"""
# declare some dummy values that we can't get from embeds
cfg = [
dims, # dims
0, # window_size
0, # epoch
0, # mincount
0, # ns
1, # word_ngrams
1, # loss, defaults to hierarchical_softmax
1, # model, defaults to CBOW
n_buckets, # buckets
min_n, # min_n
max_n, # max_n
0, # lr_update_rate
0, # sampling_threshold
]
_write_binary(file, "<12id", *cfg)
def _write_ft_vocab(outf: BinaryIO, vocab: Vocab):
"""
Helper method to write a vocab to fastText.
"""
# assumes that vocab_size == word_size if n_labels == 0
_write_binary(outf, "<iii", len(vocab), len(vocab), 0)
# we discard n_tokens, serialize as 0, vocab is not pruned, thus -1
_write_binary(outf, "<qq", 0, -1)
for word in vocab:
outf.write(word.encode("utf-8"))
outf.write(b'\x00')
# we don't store frequency, also set to 0
_write_binary(outf, "<q", 0)
# all entries are words = 0
_write_binary(outf, "b", 0)
def _write_ft_storage_subwords(outf: BinaryIO, embeds: Embeddings):
"""
Helper method to write a storage with subwords.
Restores the original embedding format of fastText, i.e. precomputation is
undone and unnormalizes the embeddings.
"""
vocab = embeds.vocab
assert isinstance(vocab, FastTextVocab)
storage = embeds.storage
norms = embeds.norms
for i, word in enumerate(vocab):
indices = vocab.subword_indices(word)
embed = storage[i] # type: np.ndarray
embed = embed * (len(indices) + 1)
if norms is not None:
embed *= norms[i]
sw_embeds = storage[indices] # type: np.ndarray
embed -= sw_embeds.sum(0, keepdims=False)
_serialize_array_as_le(outf, embed)
_serialize_array_as_le(outf, storage[len(vocab):])
def _write_ft_storage_simple(outf: BinaryIO, embeds: Embeddings):
"""
Helper method to write storage of a simple vocab model.
Unnormalizes embeddings.
"""
storage = embeds.storage
norms = embeds.norms
for i in range(storage.shape[0]):
embed = storage[i] # type: np.ndarray
if norms is not None:
embed = norms[i] * embed
_serialize_array_as_le(outf, embed)
_FT_REQUIRED_CFG_KEYS = [
'dims', 'window_size', 'epoch', 'min_count', 'ns', 'word_ngrams', 'loss',
'model', 'buckets', 'min_n', 'max_n', 'lr_update_rate',
'sampling_threshold'
]
__all__ = ['load_fasttext', 'write_fasttext']