clapper: Implement harvest caching

Using recently added local cache functionality, store harvests
that have expiration date. With this, next time the same URI is
selected for playback we can read it from cache, skipping loading
of any enhancer plugins and doing network requests.

This also works nicely with Clapper discoverer feature.
Making queued items be fetched and cached ahead of playback.
This commit is contained in:
Rafał Dzięgiel
2025-05-18 20:24:08 +02:00
parent b30d53d8ce
commit 6ddb53252a
7 changed files with 409 additions and 42 deletions

View File

@@ -39,12 +39,18 @@ gint clapper_cache_read_int (const gchar **data);
G_GNUC_INTERNAL
guint clapper_cache_read_uint (const gchar **data);
G_GNUC_INTERNAL
gint64 clapper_cache_read_int64 (const gchar **data);
G_GNUC_INTERNAL
gdouble clapper_cache_read_double (const gchar **data);
G_GNUC_INTERNAL
const gchar * clapper_cache_read_string (const gchar **data);
G_GNUC_INTERNAL
const guint8 * clapper_cache_read_data (const gchar **data, gsize *size);
G_GNUC_INTERNAL
GType clapper_cache_read_enum (const gchar **data);
@@ -69,12 +75,18 @@ void clapper_cache_store_int (GByteArray *bytes, gint val);
G_GNUC_INTERNAL
void clapper_cache_store_uint (GByteArray *bytes, guint val);
G_GNUC_INTERNAL
void clapper_cache_store_int64 (GByteArray *bytes, gint64 val);
G_GNUC_INTERNAL
void clapper_cache_store_double (GByteArray *bytes, gdouble val);
G_GNUC_INTERNAL
void clapper_cache_store_string (GByteArray *bytes, const gchar *val);
G_GNUC_INTERNAL
void clapper_cache_store_data (GByteArray *bytes, const guint8 *val, gsize val_size);
G_GNUC_INTERNAL
void clapper_cache_store_enum (GByteArray *bytes, GType enum_type);

View File

@@ -111,6 +111,15 @@ clapper_cache_read_uint (const gchar **data)
return val;
}
inline gint64
clapper_cache_read_int64 (const gchar **data)
{
gint64 val = *(const gint64 *) *data;
*data += sizeof (gint64);
return val;
}
inline gdouble
clapper_cache_read_double (const gchar **data)
{
@@ -134,6 +143,22 @@ clapper_cache_read_string (const gchar **data)
return str;
}
inline const guint8 *
clapper_cache_read_data (const gchar **data, gsize *size)
{
const guint8 *val = NULL;
*size = *(const gsize *) *data;
*data += sizeof (gsize);
if (G_LIKELY (*size > 0)) {
val = (const guint8 *) *data;
*data += *size;
}
return val;
}
inline GType
clapper_cache_read_enum (const gchar **data)
{
@@ -332,6 +357,12 @@ clapper_cache_store_uint (GByteArray *bytes, guint val)
g_byte_array_append (bytes, (const guint8 *) &val, sizeof (guint));
}
inline void
clapper_cache_store_int64 (GByteArray *bytes, gint64 val)
{
g_byte_array_append (bytes, (const guint8 *) &val, sizeof (gint64));
}
inline void
clapper_cache_store_double (GByteArray *bytes, gdouble val)
{
@@ -349,6 +380,14 @@ clapper_cache_store_string (GByteArray *bytes, const gchar *val)
g_byte_array_append (bytes, (const guint8 *) val, strlen (val) + 1);
}
inline void
clapper_cache_store_data (GByteArray *bytes, const guint8 *val, gsize val_size)
{
g_byte_array_append (bytes, (const guint8 *) &val_size, sizeof (gsize));
if (G_LIKELY (val_size > 0))
g_byte_array_append (bytes, val, val_size);
}
inline void
clapper_cache_store_enum (GByteArray *bytes, GType enum_type)
{

View File

@@ -21,6 +21,7 @@
#include <glib.h>
#include <glib-object.h>
#include <gst/gst.h>
#include "clapper-enhancer-proxy.h"
@@ -45,6 +46,9 @@ G_GNUC_INTERNAL
GObject * clapper_enhancer_proxy_get_peas_info (ClapperEnhancerProxy *proxy);
G_GNUC_INTERNAL
void clapper_enhancer_proxy_apply_current_config_to_enhancer (ClapperEnhancerProxy *proxy, GObject *enhancer);
GstStructure * clapper_enhancer_proxy_make_current_config (ClapperEnhancerProxy *proxy);
G_GNUC_INTERNAL
void clapper_enhancer_proxy_apply_config_to_enhancer (ClapperEnhancerProxy *proxy, const GstStructure *config, GObject *enhancer);
G_END_DECLS

View File

@@ -507,15 +507,18 @@ _apply_config_cb (GQuark field_id, const GValue *value, GObject *enhancer)
return TRUE;
}
void
clapper_enhancer_proxy_apply_current_config_to_enhancer (ClapperEnhancerProxy *self, GObject *enhancer)
/*
* clapper_enhancer_proxy_make_current_config:
*
* Returns: (transfer full) (nullable): Current merged global and local config as #GstStructure.
*/
GstStructure *
clapper_enhancer_proxy_make_current_config (ClapperEnhancerProxy *self)
{
GSettings *settings = clapper_enhancer_proxy_get_settings (self);
GstStructure *merged_config = NULL;
guint i;
GST_DEBUG_OBJECT (self, "Applying config to enhancer");
/* Lock here to ensure consistent local config */
GST_OBJECT_LOCK (self);
@@ -587,13 +590,14 @@ clapper_enhancer_proxy_apply_current_config_to_enhancer (ClapperEnhancerProxy *s
g_clear_object (&settings);
/* Nothing if no configurable properties
* or all have default values */
if (merged_config) {
gst_structure_foreach (merged_config, (GstStructureForeachFunc) _apply_config_cb, enhancer);
gst_structure_free (merged_config);
}
return merged_config;
}
void
clapper_enhancer_proxy_apply_config_to_enhancer (ClapperEnhancerProxy *self, const GstStructure *config, GObject *enhancer)
{
GST_DEBUG_OBJECT (self, "Applying config to enhancer");
gst_structure_foreach (config, (GstStructureForeachFunc) _apply_config_cb, enhancer);
GST_DEBUG_OBJECT (self, "Enhancer config applied");
}

View File

@@ -23,6 +23,7 @@
#include <gst/tag/tag.h>
#include "clapper-harvest.h"
#include "clapper-enhancer-proxy.h"
G_BEGIN_DECLS
@@ -32,4 +33,10 @@ ClapperHarvest * clapper_harvest_new (void);
G_GNUC_INTERNAL
gboolean clapper_harvest_unpack (ClapperHarvest *harvest, GstBuffer **buffer, gsize *buf_size, GstCaps **caps, GstTagList **tags, GstToc **toc, GstStructure **headers);
G_GNUC_INTERNAL
gboolean clapper_harvest_fill_from_cache (ClapperHarvest *harvest, ClapperEnhancerProxy *proxy, const GstStructure *config, GUri *uri);
G_GNUC_INTERNAL
void clapper_harvest_export_to_cache (ClapperHarvest *harvest, ClapperEnhancerProxy *proxy, const GstStructure *config, GUri *uri);
G_END_DECLS

View File

@@ -31,7 +31,11 @@
* https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/2867
*/
#include "config.h"
#include "clapper-harvest-private.h"
#include "clapper-cache-private.h"
#include "clapper-utils.h"
#define GST_CAT_DEFAULT clapper_harvest_debug
GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
@@ -121,6 +125,302 @@ clapper_harvest_unpack (ClapperHarvest *self,
return TRUE;
}
/* Custom implementation due to the lack of TOC serialization in GStreamer */
static void
_harvest_fill_toc_from_cache (ClapperHarvest *self, const gchar **data)
{
guint i, n_entries;
n_entries = clapper_cache_read_uint (data);
for (i = 0; i < n_entries; ++i) {
guint j, n_subentries;
n_subentries = clapper_cache_read_uint (data);
for (j = 0; j < n_subentries; ++j) {
GstTocEntryType type;
const gchar *title;
gdouble start, end;
type = (GstTocEntryType) clapper_cache_read_int (data);
title = clapper_cache_read_string (data);
start = clapper_cache_read_double (data);
end = clapper_cache_read_double (data);
clapper_harvest_toc_add (self, type, title, start, end);
}
}
}
static void
_harvest_store_toc_to_cache (ClapperHarvest *self, GByteArray *bytes)
{
GList *list = NULL, *el;
guint n_entries = 0;
if (self->toc) {
list = gst_toc_get_entries (self->toc);
n_entries = g_list_length (list);
}
clapper_cache_store_uint (bytes, n_entries);
for (el = list; el; el = g_list_next (el)) {
const GstTocEntry *entry = (const GstTocEntry *) el->data;
GList *subentries, *sub_el;
guint n_subentries;
subentries = gst_toc_entry_get_sub_entries (entry);
n_subentries = g_list_length (subentries);
clapper_cache_store_uint (bytes, n_subentries);
for (sub_el = subentries; sub_el; sub_el = g_list_next (sub_el)) {
const GstTocEntry *subentry = (const GstTocEntry *) sub_el->data;
GstTagList *tags;
gint64 start = 0, end = 0;
gdouble start_dbl, end_dbl;
const gchar *title = NULL;
clapper_cache_store_int (bytes, (gint) gst_toc_entry_get_entry_type (subentry));
if ((tags = gst_toc_entry_get_tags (subentry)))
gst_tag_list_peek_string_index (tags, GST_TAG_TITLE, 0, &title);
clapper_cache_store_string (bytes, title);
gst_toc_entry_get_start_stop_times (subentry, &start, &end);
start_dbl = ((gdouble) start) / GST_SECOND;
end_dbl = (end >= 0) ? ((gdouble) end) / GST_SECOND : -1;
clapper_cache_store_double (bytes, start_dbl);
clapper_cache_store_double (bytes, end_dbl);
}
}
}
static inline gchar *
_build_cache_filename (ClapperEnhancerProxy *proxy, GUri *uri)
{
gchar *uri_str = g_uri_to_string (uri);
gchar name[15];
g_snprintf (name, sizeof (name), "%u.bin", g_str_hash (uri_str));
g_free (uri_str);
return g_build_filename (g_get_user_cache_dir (), CLAPPER_API_NAME,
"enhancers", clapper_enhancer_proxy_get_module_name (proxy),
"harvests", name, NULL);
}
/* NOTE: On failure, this function must not modify harvest! */
gboolean
clapper_harvest_fill_from_cache (ClapperHarvest *self, ClapperEnhancerProxy *proxy,
const GstStructure *config, GUri *uri)
{
GMappedFile *mapped_file;
GstStructure *config_cached = NULL;
GError *error = NULL;
gchar *filename;
const gchar *data, *read_str;
const guint8 *buf_data;
guint8 *buf_copy;
gsize buf_size;
gint64 epoch_cached, epoch_now = 0;
gdouble exp_seconds;
gboolean changed, read_ok = FALSE;
filename = _build_cache_filename (proxy, uri);
GST_DEBUG_OBJECT (self, "Importing harvest from cache file: \"%s\"", filename);
mapped_file = clapper_cache_open (filename, &data, &error);
g_free (filename);
if (!mapped_file) {
/* No error if cache disabled or version mismatch */
if (error) {
if (error->domain == G_FILE_ERROR && error->code == G_FILE_ERROR_NOENT)
GST_DEBUG_OBJECT (self, "No cached harvest found");
else
GST_ERROR_OBJECT (self, "Could not use cached harvest, reason: %s", error->message);
g_error_free (error);
}
return FALSE;
}
/* Plugin version check */
if (g_strcmp0 (clapper_cache_read_string (&data),
clapper_enhancer_proxy_get_version (proxy)) != 0)
goto finish; // no error printing here
if (G_LIKELY ((epoch_cached = clapper_cache_read_int64 (&data)) > 0)) {
GDateTime *date = g_date_time_new_now_utc ();
epoch_now = g_date_time_to_unix (date);
g_date_time_unref (date);
}
/* Check if expired */
if ((exp_seconds = (gdouble) (epoch_cached - epoch_now)) <= 0) {
GST_DEBUG_OBJECT (self, "Cached harvest expired"); // expiration is not an error
goto finish;
}
GST_DEBUG_OBJECT (self, "Cached harvest expiration in %" CLAPPER_TIME_FORMAT,
CLAPPER_TIME_ARGS (exp_seconds));
/* Read last used config to generate cache data */
if ((read_str = clapper_cache_read_string (&data)))
config_cached = gst_structure_from_string (read_str, NULL);
/* Compare used config when cache was generated to the current one */
changed = (config_cached && config)
? !gst_structure_is_equal (config_cached, config)
: (config_cached != config);
gst_clear_structure (&config_cached);
if (changed) {
GST_DEBUG_OBJECT (self, "Enhancer config differs from the last time");
goto finish;
}
/* Read media type */
read_str = clapper_cache_read_string (&data);
if (G_UNLIKELY (read_str == NULL)) {
GST_ERROR_OBJECT (self, "Could not read media type from cache file");
goto finish;
}
/* Read buffer data */
buf_data = clapper_cache_read_data (&data, &buf_size);
if (G_UNLIKELY (buf_data == NULL)) {
GST_ERROR_OBJECT (self, "Could not read buffer data from cache");
goto finish;
}
/* Fill harvest */
buf_copy = g_memdup2 (buf_data, buf_size);
if (!clapper_harvest_fill (self, read_str, buf_copy, buf_size))
goto finish;
/* Read tags */
read_str = clapper_cache_read_string (&data);
if (read_str && (self->tags = gst_tag_list_new_from_string (read_str))) {
GST_LOG_OBJECT (self, "Read %s", read_str);
gst_tag_list_set_scope (self->tags, GST_TAG_SCOPE_GLOBAL);
}
/* Read TOC */
_harvest_fill_toc_from_cache (self, &data);
/* Read headers */
read_str = clapper_cache_read_string (&data);
if (read_str && (self->headers = gst_structure_from_string (read_str, NULL)))
GST_LOG_OBJECT (self, "Read %s", read_str);
read_ok = TRUE;
finish:
g_mapped_file_unref (mapped_file);
if (!read_ok)
return FALSE;
GST_DEBUG_OBJECT (self, "Filled harvest from cache");
return TRUE;
}
void
clapper_harvest_export_to_cache (ClapperHarvest *self, ClapperEnhancerProxy *proxy,
const GstStructure *config, GUri *uri)
{
GByteArray *bytes;
const GstStructure *caps_structure;
gchar *filename, *temp_str = NULL;
gboolean data_ok = TRUE;
/* No caching if no expiration date set */
if (self->exp_epoch <= 0)
return;
/* Might happen if extractor extract function implementation
* returns %TRUE without filling harvest properly */
if (G_UNLIKELY (self->caps == NULL || self->buffer == NULL))
return; // no data to cache
bytes = clapper_cache_create ();
/* If cache disabled */
if (G_UNLIKELY (bytes == NULL))
return;
filename = _build_cache_filename (proxy, uri);
GST_DEBUG_OBJECT (self, "Exporting harvest to cache file: \"%s\"", filename);
/* Store enhancer version that generated harvest */
clapper_cache_store_string (bytes, clapper_enhancer_proxy_get_version (proxy));
/* Store expiration date */
clapper_cache_store_int64 (bytes, self->exp_epoch);
/* Store config used to generate harvest */
if (config)
temp_str = gst_structure_to_string (config);
clapper_cache_store_string (bytes, temp_str); // NULL when no config
g_clear_pointer (&temp_str, g_free);
/* Store media type */
caps_structure = gst_caps_get_structure (self->caps, 0);
if (G_LIKELY (caps_structure != NULL)) {
clapper_cache_store_string (bytes, gst_structure_get_name (caps_structure));
} else {
GST_ERROR_OBJECT (self, "Cannot cache empty caps");
data_ok = FALSE;
}
if (G_LIKELY (data_ok)) {
GstMemory *mem;
GstMapInfo map_info;
/* Store buffer data */
mem = gst_buffer_peek_memory (self->buffer, 0);
if (G_LIKELY (gst_memory_map (mem, &map_info, GST_MAP_READ))) {
clapper_cache_store_data (bytes, map_info.data, map_info.size);
gst_memory_unmap (mem, &map_info);
} else {
GST_ERROR_OBJECT (self, "Could not map harvest buffer for reading");
data_ok = FALSE;
}
}
if (G_LIKELY (data_ok)) {
GError *error = NULL;
/* Store tags */
if (self->tags)
temp_str = gst_tag_list_to_string (self->tags);
clapper_cache_store_string (bytes, temp_str);
g_clear_pointer (&temp_str, g_free);
/* Store TOC */
_harvest_store_toc_to_cache (self, bytes);
/* Store headers */
if (self->headers)
temp_str = gst_structure_to_string (self->headers);
clapper_cache_store_string (bytes, temp_str);
g_clear_pointer (&temp_str, g_free);
if (clapper_cache_write (filename, bytes, &error)) {
GST_DEBUG_OBJECT (self, "Successfully exported harvest to cache file");
} else if (error) {
GST_ERROR_OBJECT (self, "Could not cache harvest, reason: %s", error->message);
g_error_free (error);
}
}
g_free (filename);
g_byte_array_free (bytes, TRUE);
}
/**
* clapper_harvest_fill:
* @harvest: a #ClapperHarvest
@@ -157,7 +457,7 @@ clapper_harvest_fill (ClapperHarvest *self, const gchar *media_type, gpointer da
return FALSE;
}
if (gst_debug_category_get_threshold (GST_CAT_DEFAULT) >= GST_LEVEL_DEBUG) {
if (gst_debug_category_get_threshold (GST_CAT_DEFAULT) >= GST_LEVEL_LOG) {
gboolean is_printable = (strcmp (media_type, "application/dash+xml") == 0)
|| (strcmp (media_type, "application/x-hls") == 0)
|| (strcmp (media_type, "text/uri-list") == 0);
@@ -168,7 +468,7 @@ clapper_harvest_fill (ClapperHarvest *self, const gchar *media_type, gpointer da
data_str = g_new0 (gchar, size + 1);
memcpy (data_str, data, size);
GST_DEBUG_OBJECT (self, "Filled with data:\n%s", data_str);
GST_LOG_OBJECT (self, "Filled with data:\n%s", data_str);
g_free (data_str);
}
@@ -330,7 +630,7 @@ clapper_harvest_toc_add (ClapperHarvest *self, GstTocEntryType type,
g_snprintf (edition, sizeof (edition), "0%i", type);
g_snprintf (id, sizeof (id), "%s.%" G_GUINT16_FORMAT, id_prefix, nth_entry);
GST_DEBUG_OBJECT (self, "Inserting TOC %s: \"%s\""
GST_LOG_OBJECT (self, "Inserting TOC %s: \"%s\""
" (%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT ")",
id, title, start_time, end_time);
@@ -382,7 +682,7 @@ clapper_harvest_headers_set (ClapperHarvest *self, const gchar *key, ...)
while (key != NULL) {
const gchar *val = va_arg (args, const gchar *);
GST_DEBUG_OBJECT (self, "Set header, \"%s\": \"%s\"", key, val);
GST_LOG_OBJECT (self, "Set header, \"%s\": \"%s\"", key, val);
gst_structure_set (self->headers, key, G_TYPE_STRING, val, NULL);
key = va_arg (args, const gchar *);
}
@@ -411,7 +711,7 @@ clapper_harvest_headers_set_value (ClapperHarvest *self, const gchar *key, const
_ensure_headers (self);
GST_DEBUG_OBJECT (self, "Set header, \"%s\": \"%s\"", key, g_value_get_string (value));
GST_LOG_OBJECT (self, "Set header, \"%s\": \"%s\"", key, g_value_get_string (value));
gst_structure_set_value (self->headers, key, value);
}
@@ -436,7 +736,7 @@ clapper_harvest_set_expiration_date_utc (ClapperHarvest *self, GDateTime *date_u
g_return_if_fail (date_utc != NULL);
self->exp_epoch = g_date_time_to_unix (date_utc);
GST_DEBUG_OBJECT (self, "Expiration epoch: %" G_GINT64_FORMAT, self->exp_epoch);
GST_LOG_OBJECT (self, "Expiration epoch: %" G_GINT64_FORMAT, self->exp_epoch);
}
/**
@@ -462,7 +762,8 @@ clapper_harvest_set_expiration_seconds (ClapperHarvest *self, gdouble seconds)
g_return_if_fail (CLAPPER_IS_HARVEST (self));
GST_DEBUG_OBJECT (self, "Set expiration seconds: %.3lfs", seconds);
GST_LOG_OBJECT (self, "Set expiration in %" CLAPPER_TIME_FORMAT,
CLAPPER_TIME_ARGS (seconds));
date = g_date_time_new_now_utc ();
date_epoch = g_date_time_add_seconds (date, seconds);
@@ -471,7 +772,7 @@ clapper_harvest_set_expiration_seconds (ClapperHarvest *self, gdouble seconds)
self->exp_epoch = g_date_time_to_unix (date_epoch);
g_date_time_unref (date_epoch);
GST_DEBUG_OBJECT (self, "Expiration epoch: %" G_GINT64_FORMAT, self->exp_epoch);
GST_LOG_OBJECT (self, "Expiration epoch: %" G_GINT64_FORMAT, self->exp_epoch);
}
static void

View File

@@ -57,7 +57,7 @@ clapper_enhancer_director_extract_in_thread (ClapperEnhancerDirectorData *data)
ClapperEnhancerDirector *self = data->director;
GList *el;
ClapperHarvest *harvest = NULL;
gboolean success = FALSE, cached = FALSE;
gboolean success = FALSE;
GST_DEBUG_OBJECT (self, "Extraction start");
@@ -65,22 +65,22 @@ clapper_enhancer_director_extract_in_thread (ClapperEnhancerDirectorData *data)
if (g_cancellable_is_cancelled (data->cancellable))
return NULL;
/* TODO: Cache lookup */
if (cached) {
// if ((success = fill harvest from cache))
// return harvest;
}
GST_DEBUG_OBJECT (self, "Enhancer proxies for URI: %u",
g_list_length (data->filtered_proxies));
for (el = data->filtered_proxies; el; el = g_list_next (el)) {
ClapperEnhancerProxy *proxy = CLAPPER_ENHANCER_PROXY_CAST (el->data);
ClapperExtractable *extractable = NULL;
GstStructure *config;
/* Check just before extract */
if (g_cancellable_is_cancelled (data->cancellable))
harvest = clapper_harvest_new (); // fresh harvest for each iteration
config = clapper_enhancer_proxy_make_current_config (proxy);
if ((success = clapper_harvest_fill_from_cache (harvest, proxy, config, data->uri))
|| g_cancellable_is_cancelled (data->cancellable)) { // Check before extract
gst_clear_structure (&config);
break;
}
#if CLAPPER_WITH_ENHANCERS_LOADER
extractable = CLAPPER_EXTRACTABLE_CAST (
@@ -88,32 +88,32 @@ clapper_enhancer_director_extract_in_thread (ClapperEnhancerDirectorData *data)
#endif
if (G_LIKELY (extractable != NULL)) {
clapper_enhancer_proxy_apply_current_config_to_enhancer (proxy, (GObject *) extractable);
harvest = clapper_harvest_new (); // fresh harvest for each extractable
clapper_enhancer_proxy_apply_config_to_enhancer (proxy, config, (GObject *) extractable);
success = clapper_extractable_extract (extractable, data->uri,
harvest, data->cancellable, data->error);
gst_object_unref (extractable);
/* We are done with extractable, but keep its harvest */
if (success)
break;
/* We are done with extractable, but keep harvest and try to cache it */
if (success) {
if (!g_cancellable_is_cancelled (data->cancellable))
clapper_harvest_export_to_cache (harvest, proxy, config, data->uri);
/* Clear harvest and try again with next enhancer */
g_clear_object (&harvest);
gst_clear_structure (&config);
break;
}
}
/* Cleanup to try again with next enhancer */
g_clear_object (&harvest);
gst_clear_structure (&config);
}
/* Cancelled during extract */
/* Cancelled during extraction or exporting to cache */
if (g_cancellable_is_cancelled (data->cancellable))
success = FALSE;
if (success) {
if (!cached) {
/* TODO: Store in cache */
}
} else {
if (!success) {
gst_clear_object (&harvest);
/* Ensure we have some error set on failure */