clapper: Add simple enhancer proxy jobs management

Allows to mark a job with a given ID as started, then when another thread/instance
wants to process the same ID we can detect it and await previous job finish.
This is especially useful for not extracting the same URI concurrently while
still allowing to process multiple different URIs at once without blocking.
This commit is contained in:
Rafał Dzięgiel
2025-11-06 18:44:32 +01:00
parent b41949fb87
commit acf99586ab
3 changed files with 108 additions and 2 deletions

View File

@@ -53,4 +53,10 @@ GstStructure * clapper_enhancer_proxy_make_current_config (ClapperEnhancerProxy
G_GNUC_INTERNAL
void clapper_enhancer_proxy_apply_config_to_enhancer (ClapperEnhancerProxy *proxy, const GstStructure *config, GObject *enhancer);
G_GNUC_INTERNAL
void clapper_enhancer_proxy_await_job_start (ClapperEnhancerProxy *proxy, guint job_id);
G_GNUC_INTERNAL
void clapper_enhancer_proxy_remove_job (ClapperEnhancerProxy *proxy, guint job_id);
G_END_DECLS

View File

@@ -95,6 +95,10 @@ struct _ClapperEnhancerProxy
* so store schema instead */
GSettingsSchema *schema;
gboolean schema_init_done;
GArray *jobs;
GMutex job_lock;
GCond job_cond;
};
enum
@@ -599,6 +603,75 @@ clapper_enhancer_proxy_apply_config_to_enhancer (ClapperEnhancerProxy *self, con
GST_DEBUG_OBJECT (self, "Enhancer config applied");
}
/* Needs a "job_lock" */
static gboolean
_find_job_unlocked (ClapperEnhancerProxy *self, guint job_id, guint *index)
{
guint i;
for (i = 0; i < self->jobs->len; ++i) {
guint *stored_id = &g_array_index (self->jobs, guint, i);
if (job_id == *stored_id) {
if (index)
*index = i;
return TRUE;
}
}
return FALSE;
}
/*
* clapper_enhancer_proxy_await_job_start:
*
* Can be used to prevent starting a job with the same ID concurrently,
* while allowing to do that for unique IDs.
*
* After each start, `clapper_enhancer_proxy_remove_job` must be
* called when job is considered to be done.
*/
void
clapper_enhancer_proxy_await_job_start (ClapperEnhancerProxy *self, guint job_id)
{
GST_LOG_OBJECT (self, "Requested job ID: %u", job_id);
GST_OBJECT_LOCK (self);
if (!self->jobs) {
self->jobs = g_array_new (FALSE, FALSE, sizeof (guint));
g_mutex_init (&self->job_lock);
g_cond_init (&self->job_cond);
}
GST_OBJECT_UNLOCK (self);
g_mutex_lock (&self->job_lock);
while (_find_job_unlocked (self, job_id, NULL))
g_cond_wait (&self->job_cond, &self->job_lock);
g_array_append_val (self->jobs, job_id);
GST_LOG_OBJECT (self, "Added job ID: %u", job_id);
g_mutex_unlock (&self->job_lock);
}
void
clapper_enhancer_proxy_remove_job (ClapperEnhancerProxy *self, guint job_id)
{
guint index;
g_mutex_lock (&self->job_lock);
if (G_LIKELY (_find_job_unlocked (self, job_id, &index))) {
g_array_remove_index_fast (self->jobs, index);
GST_LOG_OBJECT (self, "Removed job ID: %u", job_id);
g_cond_broadcast (&self->job_cond);
} else {
GST_ERROR_OBJECT (self, "Requested removal of nonexistent job ID: %u", job_id);
}
g_mutex_unlock (&self->job_lock);
}
/**
* clapper_enhancer_proxy_get_friendly_name:
* @proxy: a #ClapperEnhancerProxy
@@ -1188,6 +1261,12 @@ clapper_enhancer_proxy_finalize (GObject *object)
gst_clear_structure (&self->local_config);
g_clear_pointer (&self->schema, g_settings_schema_unref);
if (self->jobs) {
g_array_unref (self->jobs);
g_mutex_clear (&self->job_lock);
g_cond_clear (&self->job_cond);
}
G_OBJECT_CLASS (parent_class)->finalize (object);
}

View File

@@ -66,6 +66,8 @@ clapper_enhancer_director_extract_in_thread (ClapperEnhancerDirectorData *data)
ClapperEnhancerDirector *self = data->director;
GList *el;
ClapperHarvest *harvest = NULL;
gchar *uri_str;
guint job_id;
gboolean success = FALSE;
GST_DEBUG_OBJECT (self, "Extraction start");
@@ -74,20 +76,36 @@ clapper_enhancer_director_extract_in_thread (ClapperEnhancerDirectorData *data)
if (g_cancellable_is_cancelled (data->cancellable))
return NULL;
GST_DEBUG_OBJECT (self, "Enhancer proxies for URI: %u",
g_list_length (data->filtered_proxies));
uri_str = g_uri_to_string (data->uri);
job_id = g_str_hash (uri_str);
GST_DEBUG_OBJECT (self, "Extracting URI: \"%s\", compatible enhancers: %u",
uri_str, g_list_length (data->filtered_proxies));
g_free (uri_str);
for (el = data->filtered_proxies; el; el = g_list_next (el)) {
ClapperEnhancerProxy *proxy = CLAPPER_ENHANCER_PROXY_CAST (el->data);
ClapperExtractable *extractable = NULL;
GstStructure *config;
/* Ensures that we do not start extraction of the same URI concurrently.
* If given job is already running, blocks here until finished.
* Afterwards we try to read extracted data from cache. */
clapper_enhancer_proxy_await_job_start (proxy, job_id);
/* Cancelled during waiting for usage access */
if (g_cancellable_is_cancelled (data->cancellable)) {
clapper_enhancer_proxy_remove_job (proxy, job_id);
break;
}
harvest = clapper_harvest_new (); // fresh harvest for each iteration
config = clapper_enhancer_proxy_make_current_config (proxy);
if ((success = clapper_harvest_fill_from_cache (harvest, proxy, config, data->uri))
|| g_cancellable_is_cancelled (data->cancellable)) { // Check before extract
gst_clear_structure (&config);
clapper_enhancer_proxy_remove_job (proxy, job_id);
break;
}
@@ -111,10 +129,13 @@ clapper_enhancer_director_extract_in_thread (ClapperEnhancerDirectorData *data)
clapper_harvest_export_to_cache (harvest, proxy, config, data->uri);
}
gst_clear_structure (&config);
clapper_enhancer_proxy_remove_job (proxy, job_id);
break;
}
}
clapper_enhancer_proxy_remove_job (proxy, job_id);
/* Cleanup to try again with next enhancer */
g_clear_object (&harvest);
gst_clear_structure (&config);