Welcome to tempsdb’s documentation!¶
How this does work?¶
Note
This is about fixed length data time series.
Data is stored in so called chunks. A chunk’s last page can be actively appended to, or a chunk is immutable.
When there is a request to fetch some data, a chunk is loaded into memory. It will not
be automatically unloaded, to do this, you must periodically call
close_chunks()
.
Usage¶
Start off by instantiating an object
-
class
tempsdb.database.
Database
(unicode path: str)¶ A basic TempsDB object.
After you’re done with it, please call
close()
.If you forget to, the destructor will do that instead and emit a warning.
- Parameters
path – path to the directory with the database
- Raises
DoesNotExist – database does not exist, use create_database
- Variables
path – path to the directory with the database (str)
-
close
(self) → int¶ Close this TempsDB database
-
close_all_open_series
(self) → int¶ Closes all open series
-
create_series
(self, unicode name, int block_size, unsigned long entries_per_chunk, int page_size=4096, bool use_descriptor_based_access=False) → TimeSeries¶ Create a new series
- Parameters
name – name of the series
block_size – size of the data field
entries_per_chunk – entries per chunk file
page_size – size of a single page. Default is 4096
use_descriptor_based_access – whether to use descriptor based access instead of mmap. Default is False
- Returns
new series
- Raises
ValueError – block size was larger than page_size plus a timestamp
AlreadyExists – series with given name already exists
-
create_varlen_series
(self, unicode name, list length_profile, int size_struct, unsigned long entries_per_chunk) → VarlenSeries¶ Create a new variable length series
- Parameters
name – name of the series
length_profile – list of lengths of subsequent chunks
size_struct – how many bytes will be used to store length? Valid entries are 1, 2 and 4
entries_per_chunk – entries per chunk file
- Returns
new variable length series
- Raises
AlreadyExists – series with given name already exists
-
get_all_series
(self) → list¶ Stream all series available within this database
- Returns
a list of series names
- Return type
tp.List[str]
-
get_first_entry_for
(self, unicode name) → unsigned long long¶ Get first timestamp stored in a particular series without opening it
- Parameters
name – series name
- Returns
first timestamp stored in this series
- Raises
DoesNotExist – series does not exist
ValueError – timestamp does not have any data
-
get_open_series
(self) → list¶ Return all open series
- Returns
open series
- Return type
tp.List[TimeSeries]
-
get_series
(self, unicode name: str, bool use_descriptor_based_access=False) → TimeSeries¶ Load and return an existing series
- Parameters
name – name of the series
use_descriptor_based_access – whether to use descriptor based access instead of mmap, default is False
- Returns
a loaded time series
- Raises
DoesNotExist – series does not exist
-
get_varlen_series
(self, unicode name) → VarlenSeries¶ Load and return an existing variable length series
- Parameters
name – name of the series
- Returns
a loaded varlen series
- Raises
DoesNotExist – series does not exist
-
register_memory_pressure_manager
(self, mpm) → int¶ Register a satella MemoryPressureManager to close chunks if low on memory.
- Parameters
mpm (satella.instrumentation.memory.MemoryPressureManager) – MemoryPressureManager to use
-
sync
(self) → int¶ Synchronize all the data with the disk
You can create new databases via
-
tempsdb.database.
create_database
(unicode path) → Database¶ Creates a new, empty database
- Parameters
path – path where the DB directory will be put
- Returns
a Database object
- Raises
AlreadyExists – the directory exists
Then you can create and retrieve particular series:
-
class
tempsdb.series.
TimeSeries
(unicode path: str, unicode name: str, use_descriptor_based_access: bool = False)¶ A single time series. This maps each timestamp (unsigned long long) to a block of data of length block_size.
When you’re done with this, please call
close()
.If you forget to, the destructor will do that instead, and a warning will be emitted.
- Variables
last_entry_ts – timestamp of the last entry added or 0 if no entries yet (int)
last_entry_synced – timestamp of the last synchronized entry (int)
block_size – size of the writable block of data (int)
path – path to the directory containing the series (str)
descriptor_based_access – are all chunks using descriptor-based access? (bool)
name – name of the series (str)
metadata – extra data (tp.Optional[dict])
-
append
(self, unsigned long long timestamp, bytes data) → int¶ Append an entry.
- Parameters
timestamp – timestamp, must be larger than current last_entry_ts
data – data to write
- Raises
ValueError – Timestamp not larger than previous timestamp or invalid block size
InvalidState – the resource is closed
-
append_padded
(self, unsigned long long timestamp, bytes data) → int¶ Same as
append()
but will accept data shorter than block_size.It will be padded with zeros.
- Parameters
timestamp – timestamp, must be larger than current last_entry_ts
data – data to write
- Raises
ValueError – Timestamp not larger than previous timestamp or invalid block size
InvalidState – the resource is closed
-
close
(self) → int¶ Close the series.
No further operations can be executed on it afterwards.
-
close_chunks
(self) → int¶ Close all chunks opened by read requests that are not referred to anymore.
No-op if closed.
-
delete
(self) → int¶ Erase this series from the disk. Series must be opened to do that.
- Raises
InvalidState – series is not opened
-
disable_mmap
(self) → int¶ Switches to descriptor-based file access method for the entire series, and all chunks open inside.
-
enable_mmap
(self) → int¶ Switches to mmap-based file access method for the entire series, and all chunks open inside.
This will try to enable mmap on every chunk, but if mmap fails due to recoverable errors, it will remain in descriptor-based mode.
- Raises
Corruption – mmap failed due to an irrecoverable error
-
get_current_value
(self) → tuple¶ Return latest value of this series
- Returns
tuple of (timestamp, value)
- Return type
tp.Tuple[int, bytes]
- Raises
ValueError – series has no data
-
iterate_range
(self, unsigned long long start, unsigned long long stop) → Iterator¶ Return an iterator through collected data with given timestamps.
- Parameters
start – timestamp to start at
stop – timestamp to stop at
- Returns
an iterator with the data
- Raises
ValueError – start larger than stop
-
mark_synced_up_to
(self, unsigned long long timestamp) → int¶ Mark the series as synced up to particular timestamp.
This will additionally sync the metadata.
- Parameters
timestamp – timestamp of the last synced entry
-
open_chunks_mmap_size
(self) → unsigned long¶ Calculate how much RAM does the mmaped space take
- Returns
how much RAM, in bytes, do the opened chunks consume?
-
set_metadata
(self, dict new_meta) → int¶ Set a new value for the
metadata
property.This writes the disk.
- Parameters
new_meta – new value of metadata property
-
sync
(self) → int¶ Synchronize the data kept in the memory with these kept on disk
- Raises
InvalidState – the resource is closed
-
trim
(self, unsigned long long timestamp) → int¶ Delete all entries earlier than timestamp that are closed.
Note that this will drop entire chunks, so it may be possible that some entries will linger on.
This will affect only closed chunks. Chunks ready to delete that are closed after this will not be deleted, as
trim()
will need to be called again.- Parameters
timestamp – timestamp to delete entries earlier than
You retrieve their data via Iterators:
-
class
tempsdb.iterators.
Iterator
(TimeSeries parent: TimeSeries, start: int, stop: int, chunks: tp.List[Chunk])¶ Iterator that allows iterating through result.
Can be used as a context manager:
>>> with series.iterate_range(0, 5000) as it: >>> for timestamp, value in it: >>> ...
It will close itself automatically via destructor, if you forget to call close.
At most basic this implements an iterator interface, iterating over tp.Tuple[int, bytes] - timestamp and data
When you’re done call
close()
to release the resources.A warning will be emitted in the case that destructor has to call
close()
.-
close
(self) → int¶ Close this iterator, release chunks.
It is imperative that you call this, otherwise some chunks might remain in memory.
This is hooked by destructor, but release it manually ASAP.
No-op if iterator is already closed.
-
next_item
(self) → tuple¶ Return next element or None, if list was exhausted
- Returns
next element
- Return type
tp.Optional[tp.Tuple[int, bytes]]
-
Appending the data is done via append()
. Since time series are
allocated in entire pages, so your files will be padded to a page in size. This makes writes
quite fast, as in 99.9% cases it is just a memory operation.
Exceptions¶
The base TempsDB exception is
-
class
tempsdb.exceptions.
TempsDBError
¶ Base class for TempsDB errors
The exceptions that inherit from it are:
-
class
tempsdb.exceptions.
DoesNotExist
¶ The required resource does not exist
-
class
tempsdb.exceptions.
Corruption
¶ Corruption was detected in the dataset
-
class
tempsdb.exceptions.
InvalidState
¶ An attempt was made to write to a resource that’s closed
-
class
tempsdb.exceptions.
AlreadyExists
¶ Provided object already exists
-
class
tempsdb.exceptions.
StillOpen
¶ This resource has outstanding references and cannot be closed
Chunk¶
For your convenience the class Chunk
was also documented, but don’t use
it directly:
-
class
tempsdb.chunks.
Chunk
(parent: tp.Optional[TimeSeries], unicode path: str, page_size: int, use_descriptor_access: bool = False)¶ Represents a single chunk of time series.
This also implements an iterator interface, and will iterate with tp.Tuple[int, bytes], as well as a sequence protocol.
This will try to mmap opened files, but if mmap fails due to not enough memory this will use descriptor-based access.
- Parameters
parent – parent time series
path – path to the chunk file
use_descriptor_access – whether to use descriptor based access instead of mmap
- Variables
path – path to the chunk (str)
min_ts – timestamp of the first entry stored (int)
max_ts – timestamp of the last entry stored (int)
block_size – size of the data entries (int)
entries – amount of entries in this chunk (int)
page_size – size of the page (int)
-
append
(self, unsigned long long timestamp, bytes data) → int¶ Append a record to this chunk.
Might range from very fast (just a memory operation) to quite slow (adding a new page to the file).
Simultaneous writing is not thread-safe.
Timestamp and data is not checked for, this is supposed to be handled by
TimeSeries
.- Parameters
timestamp – timestamp of the entry
data – data to write
- Raises
InvalidState – chunk is closed
-
close
(self, bool force=False) → int¶ Close the chunk and close the allocated resources
- Parameters
force – whether to close the chunk even if it’s open somewhere
- Raises
StillOpen – this chunk has a parent attached and the parent says that this chunk is still being referred to
-
delete
(self) → int¶ Close and delete this chunk.
-
find_left
(self, unsigned long long timestamp) → unsigned int¶ Return an index i of position such that ts[i] <= timestamp and (timestamp-ts[i]) -> min.
Used as bound in searches: you start from this index and finish at
find_right()
.- Parameters
timestamp – timestamp to look for, must be smaller or equal to largest element in the chunk
- Returns
index such that ts[i] <= timestamp and (timestamp-ts[i]) -> min, or length of the array if timestamp is larger than largest element in this chunk
-
find_right
(self, unsigned long long timestamp) → unsigned int¶ Return an index i of position such that ts[i] > timestamp and (ts[i]-timestamp) -> min
Used as bound in searches: you start from
find_right()
and finish at this inclusive.- Parameters
timestamp – timestamp to look for
- Returns
index such that ts[i] > timestamp and (ts[i]-timestamp) -> min
-
get_byte_of_piece
(self, unsigned int index, int byte_index) → int¶ Return a particular byte of given element at given index.
When index is negative, or larger than block_size, the behaviour is undefined
- Parameters
index – index of the element
byte_index – index of the byte
- Returns
value of the byte
- Raises
ValueError – index too large
-
get_mmap_size
(self) → unsigned long¶ - Returns
how many bytes are mmaped?
- Return type
int
-
get_slice_of_piece_at
(self, unsigned int index, int start, int stop) → bytes¶ Return a slice of data from given element
- Parameters
index – index of the element
start – starting offset of data
stop – stopping offset of data
- Returns
a byte slice
-
get_slice_of_piece_starting_at
(self, unsigned int index, int start) → bytes¶ Return a slice of data from given element starting at given index to the end
- Parameters
index – index of the element
start – starting index
- Returns
a byte slice
-
get_timestamp_at
(self, unsigned int index) → unsigned long long¶ Return a timestamp at a particular location
Passing an invalid index will result in an undefined behaviour.
- Parameters
index – index of element
- Returns
the timestamp
-
get_value_at
(self, unsigned int index) → bytes¶ Return only the value at a particular index, numbered from 0
- Returns
value at given index
-
iterate_indices
(self, unsigned long starting_entry, unsigned long stopping_entry)¶ Return a partial iterator starting at starting_entry and ending at stopping_entry (exclusive).
- Parameters
starting_entry – index of starting entry
stopping_entry – index of stopping entry
- Returns
an iterator
- Return type
tp.Iterator[tp.Tuple[int, bytes]]
-
switch_to_descriptor_based_access
(self) → int¶ Switch self to descriptor-based access instead of mmap.
No-op if already in descriptor based mode.
-
switch_to_mmap_based_access
(self) → int¶ Switch self to mmap-based access instead of descriptor-based.
No-op if already in mmap mode.
- Raises
Corruption – unable to mmap file due to an unrecoverable error
Data stored in files is little endian.
A file storing a chunk consists as follows:
4 bytes unsigned int - block size
- repeated
8 bytes unsigned long long - timestamp
block_size bytes of data
It’s padded to page_size with zeros, and four last bytes is the unsigned long amount of entries
Variable length series¶
New in version 0.5.
How does it work?¶
They work by breaking down your data into smaller pieces and storing them in separate series, prefixing with length.
For each series you specify so-called length profile. It is a list of ints, each representing a block size for next series created. If an entry cannot fit in the already created series, a new one will be created. Note that the last entry of this array will loop forever, so if you for example put a 1024 byte data in a varlen series of length profile [10, 255] there will be a total of 5 normal time series created to accommodate it, with length of: * 10 * 255 * 255 * 255 * 255
Note that an entry is written to enough series so that it fits. For example, a 8 byte piece of data would be written to only to the first series.
Each entry is also prefixed by it’s length, so the actual size of the first series is larger by that. The size of that field is described by an extra parameter called size_struct. It represents an unsigned number.
Note that the only valid sizes of size_struct are: * 1 for maximum length of 255 * 2 for maximum length of 65535 * 3 for maximum length of 16777215 * 4 for maximum length of 4294967295
Also note that variable length series live in a different namespace than standard time series, so you can name them the same.
Accessing them¶
Use methods tempsdb.database.Database.create_varlen_series()
and
tempsdb.database.Database.get_varlen_series()
to obtain instances of following class:
-
class
tempsdb.varlen.
VarlenSeries
(unicode path: str, unicode name: str)¶ A time series housing variable length data.
It does that by splitting the data into chunks and encoding them in multiple series.
- Parameters
path – path to directory containing the series
name – name of the series
-
append
(self, unsigned long long timestamp, bytes data) → int¶ Append an entry to the series
- Parameters
timestamp – timestamp to append it with
data – data to write
- Raises
ValueError – too long an entry
-
close
(self) → int¶ Close this series.
No-op if already closed.
- Raises
StillOpen – some references are being held
-
delete
(self) → int¶ Erases this variable length series from the disk.
Closes this series as a side-effect.
-
get_maximum_length
(self) → long long¶ - Returns
maximum length of an element capable of being stored in this series
-
iterate_range
(self, unsigned long long start, unsigned long long stop, bool direct_bytes=False) → VarlenIterator¶ Return an iterator with the data
-
last_entry_synced
¶ - Returns
timestamp of the last entry synchronized. Starting value is 0
-
mark_synced_up_to
(self, unsigned long long timestamp) → int¶ Mark the series as synchronized up to particular period
- Parameters
timestamp – timestamp of synchronization
-
trim
(self, unsigned long long timestamp) → int¶ Try to delete all entries younger than timestamp
- Parameters
timestamp – timestamp that separates alive entries from the dead
-
class
tempsdb.varlen.
VarlenIterator
(VarlenSeries parent: VarlenSeries, start: int, stop: int, direct_bytes: bool = False)¶ A result of a varlen series query.
This iterator will close itself when completed. If you break out of it’s iteration, please close it youself via
close()
If you forget to do that, a warning will be issued and the destructor will close it automatically.
- Parameters
parent – parent series
start – started series
stop – stopped series
direct_bytes – whether to iterate with bytes values instead of
VarlenEntry
. Note that setting this to True will result in a performance drop, since it will copy, but it should be faster if your typical entry is less than 20 bytes.
-
close
(self) → int¶ Close this iterator and release all the resources
No-op if already closed.
-
get_next
(self) → VarlenEntry¶ Return next element of the iterator, or None if no more available.
-
class
tempsdb.varlen.
VarlenEntry
(VarlenSeries parent: VarlenSeries, chunks: tp.List[Chunk], item_no: tp.List[int])¶ An object representing the value.
It is preferred for an proxy to exist, instead of copying data. This serves make tempsdb far more zero-copy, but it’s worth it only if your values are routinely longer than 20-40 bytes.
This behaves as a bytes object, in particular it can be sliced, iterated, and it’s length obtained. It also overloads __bytes__. It’s also directly comparable and hashable, and boolable.
This acquires a reference to the chunk it refers, and releases it upon destruction.
Once
to_bytes()
is called, it’s result will be cached.-
close
(self) → int¶ Close this object and release all the references.
It is not necessary to call, since the destructor will call this. .. warning:: Do not let your VarlenEntries outlive the iterator itself!
It will be impossible to close the iterator.
-
endswith
(self, bytes v) → bool¶ Check whether this sequence ends with provided bytes.
This will run faster than bytes(v).endswith(b’test’) since it will fetch only the required amount of bytes.
- Parameters
v – bytes to check
- Returns
whether the sequence ends with provided bytes
-
get_byte_at
(self, int index) → int¶ Return a byte at a particular index
- Parameters
index – index of the byte
- Returns
the value of the byte
- Raises
ValueError – index too large
-
length
(self) → int¶ - Returns
self length
-
slice
(self, int start, int stop) → bytes¶ Returns a slice of the entry
- Parameters
start – position to start at
stop – position to stop at
- Returns
a slice of this entry
- Raises
ValueError – stop was smaller than start or indices were invalid
-
startswith
(self, bytes v) → bool¶ Check whether this sequence starts with provided bytes.
This will run faster than bytes(v).startswith(b’test’) since it will fetch only the required amount of bytes.
- Parameters
v – bytes to check
- Returns
whether the sequence starts with provided bytes
-
timestamp
(self) → unsigned long long¶ - Returns
timestamp assigned to this entry
-
to_bytes
(self) → bytes¶ - Returns
value as bytes
-
Integration with Satella’s MemoryPressureManager¶
This library integrates itself with Satella MemoryPressureManager.
It will close the non-required chunks when remaining in severity 1 each 30 seconds.
To attach a MPM to a database, use
tempsdb.database.Database.register_memory_pressure_manager()
.
Series will automatically inherit the parent database’s MemoryPressureManager.
This is an append-only embedded time series library written in Cython.
It tries to use mmap for reads and writes, and in general is as zero-copy as possible (ie. the only time data is unserialized is when a particular entry is read). It also uses iterators.
Stored time series with a 8-bit timestamp and a fixed length of data. So no variable encoding for you!
New in version 0.2.
When mmap fails due to memory issues, this falls back to slower fwrite()/fread() implementation. You can also manually select the descriptor-based implementation if you want to.