Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel download with pthreads #51

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ else
AC_SUBST(CURL_LIBS)
fi

PTHREAD_LIBS=-lpthread
AC_SUBST(PTHREAD_CFLAGS)
AC_SUBST(PTHREAD_LIBS)

#AC_ARG_WITH(taglib, [ --without-taglib disable taglib support])
AC_ARG_WITH(taglib, AC_HELP_STRING([--without-taglib], [disable taglib support])])
if test "x$with_taglib" != "xno"; then
Expand Down
6 changes: 4 additions & 2 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
AM_CPPFLAGS = $(GLIBS_CFLAGS) $(CURL_CFLAGS)
AM_CPPFLAGS = $(GLIBS_CFLAGS) $(CURL_CFLAGS) $(PTHREAD_CFLAGS)

bin_PROGRAMS = castget

Expand Down Expand Up @@ -47,4 +47,6 @@ castget_SOURCES = \
castget_LDADD = \
$(CURL_LIBS) \
$(GLIBS_LIBS) \
$(TAGLIB_LIBS)
$(TAGLIB_LIBS) \
$(PTHREAD_LIBS) \

133 changes: 109 additions & 24 deletions src/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
#include "urlget.h"
#include "utils.h"

#include <errno.h>
#include <glib/gprintf.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
Expand Down Expand Up @@ -278,6 +281,20 @@ static int _do_download(channel *c, channel_info *channel_info, rss_item *item,
return download_failed;
}

void *_do_download_thread(void *arg)
{
download_data *data = (download_data *)arg;
// Resume and progress bars are disabled, because
// those will be tricky with threading.
if (data->show_progress_bar)
printf("Downloading: %s\n", data->item->title);
data->result = _do_download(data->c, data->channel_info, data->item,
data->user_data, data->cb, 0, data->debug, 0);
if (data->show_progress_bar)
printf("Download Complete: %s\n", data->item->title);
pthread_exit(NULL);
}

static int _do_catchup(channel *c, channel_info *channel_info, rss_item *item,
void *user_data, channel_callback cb)
{
Expand All @@ -292,17 +309,32 @@ static int _do_catchup(channel *c, channel_info *channel_info, rss_item *item,
return 0;
}

void *_do_catchup_thread(void *arg)
{
catchup_data *data = (catchup_data *)arg;
data->result = _do_catchup(data->c, data->channel_info, data->item,
data->user_data, data->cb);
pthread_exit(NULL);
}

int channel_update(channel *c, void *user_data, channel_callback cb,
int no_download, int no_mark_read, int first_only,
int resume, enclosure_filter *filter, int debug,
int show_progress_bar)
{
int i, download_failed;
rss_file *f;
int num_catchup = 0;
int num_download = 0;
void *status;
int t_result;

/* Retrieve the RSS file. */
f = _get_rss(c, user_data, cb, debug);

catchup_data catchup_items[f->num_items];
download_data download_items[f->num_items];

if (!f)
return 1;

Expand All @@ -317,35 +349,88 @@ int channel_update(channel *c, void *user_data, channel_callback cb,
item = f->items[i];

if (!filter || _enclosure_pattern_match(filter, item->enclosure)) {
if (no_download)
download_failed =
_do_catchup(c, &(f->channel_info), item, user_data, cb);
else
download_failed =
_do_download(c, &(f->channel_info), item, user_data, cb, resume,
debug, show_progress_bar);

if (download_failed)
break;

if (!no_mark_read) {
/* Mark enclosure as downloaded and immediately save channel
file to ensure that it reflects the change. */
g_hash_table_insert(c->downloaded_enclosures,
f->items[i]->enclosure->url,
(gpointer)get_rfc822_time());

_cast_channel_save(c, debug);
if (no_download) {
// download_failed =
//_do_catchup(c, &(f->channel_info), item, user_data, cb);
catchup_items[num_catchup] =
(catchup_data){ .c = c,
.channel_info = &(f->channel_info),
.item = item,
.user_data = user_data,
.cb = cb };
num_catchup++;
} else {
// download_failed =
//_do_download(c, &(f->channel_info), item, user_data, cb, resume,
// debug, show_progress_bar);
download_items[num_download] =
(download_data){ .c = c,
.channel_info = &(f->channel_info),
.item = item,
.user_data = user_data,
.cb = cb,
.resume = resume,
.debug = debug,
.show_progress_bar = show_progress_bar };
num_download++;
}

/* If we have been instructed to deal only with the first
available enclosure, it is time to break out of the loop. */
if (first_only)
break;
}

/* If we have been instructed to deal only with the first
available enclosure, it is time to break out of the loop. */
if (first_only)
break;
}
}

pthread_t catchup_threads[num_catchup];

for (i = 0; i < num_catchup; i++) {
pthread_create(&catchup_threads[i], NULL, _do_catchup_thread,
(void *)&catchup_items[i]);
}

pthread_t download_threads[num_download];
for (i = 0; i < num_download; i++) {
pthread_create(&download_threads[i], NULL, _do_download_thread,
(void *)&download_items[i]);
}

for (i = 0; i < num_catchup; i++) {
t_result = pthread_join(catchup_threads[i], NULL);
if (t_result != 0)
printf(strerror(t_result));
if (catchup_items[i].result != 0) {
download_failed = 1;
break;
}
if (!no_mark_read) {
/* Mark enclosure as downloaded and immediately save channel
file to ensure that it reflects the change. */
g_hash_table_insert(c->downloaded_enclosures, f->items[i]->enclosure->url,
(gpointer)get_rfc822_time());

_cast_channel_save(c, debug);
}
}

for (i = 0; i < num_download; i++) {
t_result = pthread_join(download_threads[i], NULL);
if (t_result != 0)
printf(strerror(t_result));
if (download_items[i].result != 0 || download_failed) {
download_failed = 1;
printf("Download failed");
break;
}
/* Mark enclosure as downloaded and immediately save channel
file to ensure that it reflects the change. */
g_hash_table_insert(c->downloaded_enclosures, f->items[i]->enclosure->url,
(gpointer)get_rfc822_time());

_cast_channel_save(c, debug);
}

if (!no_mark_read) {
/* Update the RSS last fetched time and save the channel file again. */

Expand Down
23 changes: 23 additions & 0 deletions src/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,27 @@ int channel_update(channel *c, void *user_data, channel_callback cb,
enclosure_filter *enclosure_filter_new(const gchar *pattern, gboolean caseless);
void enclosure_filter_free(enclosure_filter *e);

typedef struct _rss_item rss_item;

typedef struct _download_data {
channel *c;
channel_info *channel_info;
rss_item *item;
void *user_data;
channel_callback cb;
int resume;
int debug;
int show_progress_bar;
int result;
} download_data;

typedef struct _catchup_data {
channel *c;
channel_info *channel_info;
rss_item *item;
void *user_data;
channel_callback cb;
int result;
} catchup_data;

#endif /* CHANNEL_H */