Merge pull request #591 from Rafostar/proxy-jobs

clapper: Add simple enhancer proxy jobs management
This commit is contained in:
Rafał Dzięgiel
2025-11-10 18:37:24 +01:00
committed by GitHub
3 changed files with 108 additions and 2 deletions

View File

@@ -53,4 +53,10 @@ GstStructure * clapper_enhancer_proxy_make_current_config (ClapperEnhancerProxy
G_GNUC_INTERNAL
void clapper_enhancer_proxy_apply_config_to_enhancer (ClapperEnhancerProxy *proxy, const GstStructure *config, GObject *enhancer);
G_GNUC_INTERNAL
void clapper_enhancer_proxy_await_job_start (ClapperEnhancerProxy *proxy, guint job_id);
G_GNUC_INTERNAL
void clapper_enhancer_proxy_remove_job (ClapperEnhancerProxy *proxy, guint job_id);
G_END_DECLS

View File

@@ -95,6 +95,10 @@ struct _ClapperEnhancerProxy
* so store schema instead */
GSettingsSchema *schema;
gboolean schema_init_done;
GArray *jobs;
GMutex job_lock;
GCond job_cond;
};
enum
@@ -599,6 +603,75 @@ clapper_enhancer_proxy_apply_config_to_enhancer (ClapperEnhancerProxy *self, con
GST_DEBUG_OBJECT (self, "Enhancer config applied");
}
/* Needs a "job_lock" */
static gboolean
_find_job_unlocked (ClapperEnhancerProxy *self, guint job_id, guint *index)
{
guint i;
for (i = 0; i < self->jobs->len; ++i) {
guint *stored_id = &g_array_index (self->jobs, guint, i);
if (job_id == *stored_id) {
if (index)
*index = i;
return TRUE;
}
}
return FALSE;
}
/*
* clapper_enhancer_proxy_await_job_start:
*
* Can be used to prevent starting a job with the same ID concurrently,
* while allowing to do that for unique IDs.
*
* After each start, `clapper_enhancer_proxy_remove_job` must be
* called when job is considered to be done.
*/
void
clapper_enhancer_proxy_await_job_start (ClapperEnhancerProxy *self, guint job_id)
{
GST_LOG_OBJECT (self, "Requested job ID: %u", job_id);
GST_OBJECT_LOCK (self);
if (!self->jobs) {
self->jobs = g_array_new (FALSE, FALSE, sizeof (guint));
g_mutex_init (&self->job_lock);
g_cond_init (&self->job_cond);
}
GST_OBJECT_UNLOCK (self);
g_mutex_lock (&self->job_lock);
while (_find_job_unlocked (self, job_id, NULL))
g_cond_wait (&self->job_cond, &self->job_lock);
g_array_append_val (self->jobs, job_id);
GST_LOG_OBJECT (self, "Added job ID: %u", job_id);
g_mutex_unlock (&self->job_lock);
}
void
clapper_enhancer_proxy_remove_job (ClapperEnhancerProxy *self, guint job_id)
{
guint index;
g_mutex_lock (&self->job_lock);
if (G_LIKELY (_find_job_unlocked (self, job_id, &index))) {
g_array_remove_index_fast (self->jobs, index);
GST_LOG_OBJECT (self, "Removed job ID: %u", job_id);
g_cond_broadcast (&self->job_cond);
} else {
GST_ERROR_OBJECT (self, "Requested removal of nonexistent job ID: %u", job_id);
}
g_mutex_unlock (&self->job_lock);
}
/**
* clapper_enhancer_proxy_get_friendly_name:
* @proxy: a #ClapperEnhancerProxy
@@ -1188,6 +1261,12 @@ clapper_enhancer_proxy_finalize (GObject *object)
gst_clear_structure (&self->local_config);
g_clear_pointer (&self->schema, g_settings_schema_unref);
if (self->jobs) {
g_array_unref (self->jobs);
g_mutex_clear (&self->job_lock);
g_cond_clear (&self->job_cond);
}
G_OBJECT_CLASS (parent_class)->finalize (object);
}

View File

@@ -66,6 +66,8 @@ clapper_enhancer_director_extract_in_thread (ClapperEnhancerDirectorData *data)
ClapperEnhancerDirector *self = data->director;
GList *el;
ClapperHarvest *harvest = NULL;
gchar *uri_str;
guint job_id;
gboolean success = FALSE;
GST_DEBUG_OBJECT (self, "Extraction start");
@@ -74,20 +76,36 @@ clapper_enhancer_director_extract_in_thread (ClapperEnhancerDirectorData *data)
if (g_cancellable_is_cancelled (data->cancellable))
return NULL;
GST_DEBUG_OBJECT (self, "Enhancer proxies for URI: %u",
g_list_length (data->filtered_proxies));
uri_str = g_uri_to_string (data->uri);
job_id = g_str_hash (uri_str);
GST_DEBUG_OBJECT (self, "Extracting URI: \"%s\", compatible enhancers: %u",
uri_str, g_list_length (data->filtered_proxies));
g_free (uri_str);
for (el = data->filtered_proxies; el; el = g_list_next (el)) {
ClapperEnhancerProxy *proxy = CLAPPER_ENHANCER_PROXY_CAST (el->data);
ClapperExtractable *extractable = NULL;
GstStructure *config;
/* Ensures that we do not start extraction of the same URI concurrently.
* If given job is already running, blocks here until finished.
* Afterwards we try to read extracted data from cache. */
clapper_enhancer_proxy_await_job_start (proxy, job_id);
/* Cancelled during waiting for usage access */
if (g_cancellable_is_cancelled (data->cancellable)) {
clapper_enhancer_proxy_remove_job (proxy, job_id);
break;
}
harvest = clapper_harvest_new (); // fresh harvest for each iteration
config = clapper_enhancer_proxy_make_current_config (proxy);
if ((success = clapper_harvest_fill_from_cache (harvest, proxy, config, data->uri))
|| g_cancellable_is_cancelled (data->cancellable)) { // Check before extract
gst_clear_structure (&config);
clapper_enhancer_proxy_remove_job (proxy, job_id);
break;
}
@@ -111,10 +129,13 @@ clapper_enhancer_director_extract_in_thread (ClapperEnhancerDirectorData *data)
clapper_harvest_export_to_cache (harvest, proxy, config, data->uri);
}
gst_clear_structure (&config);
clapper_enhancer_proxy_remove_job (proxy, job_id);
break;
}
}
clapper_enhancer_proxy_remove_job (proxy, job_id);
/* Cleanup to try again with next enhancer */
g_clear_object (&harvest);
gst_clear_structure (&config);