Django + Celery & Rabbit

Hello, today post is first one in series about Celery in Django application and how to use it.

What you gain after reading such series?

How to integrate celery + rabbitmq in basic Django application.

So what will be our basic application?

It is planned to be web service where you can upload mp3 file and then have them transcoded into ogg, wav and ac3 files.

Recently I started reading an excellent book called Two Scoops of Django: Best Practices for Django 1.8. The book is about what to do and what not to do for your Django project. I feel some opportunity to try advice from the book in real project. I will start from project layout: normally you have something like this:

$ tree trancoder

trancoder
├── audiotranscoder
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tests.py
│ └── views.py
└── trancoder
├── manage.py
└── trancoder
├── __init__.py
├── settings.py
├── urls.py
└── wsgi.py

But in the book authors suggest that it can be changed to something like this:

$ tree transcoder

transcoder
├── README.rst
├── requirements.txt
└── transcoder
├── audio_transcoder
│ ├── admin.py
│ ├── apps.py
│ ├── forms.py
│ ├── __init__.py
│ ├── migrations
│ │ ├── 0001_initial.py
│ │ └── __init__.py
│ ├── models.py
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── config
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── manage.py
├── media
└── templates
├── base.html
├── home.html
└── upload.html

After working some time with this layout I have to say that it's very responsive and good structured in my opinion. But there is need for changes in 2 files for Django to be able recognizing where to look up for settings and WSGI modules:

manage.py:

#!/usr/bin/env python
import os
import sys

if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

from django.core.management import execute_from_command_line

execute_from_command_line(sys.argv)

settings.py:

# rest of settings file
ROOT_URLCONF = 'config.urls'

WSGI_APPLICATION = 'config.wsgi.application'

MEDIA_ROOT = os.path.join(BASE_DIR, 'media')

MEDIA_URL = '/media/'

The transcoder need file to transcode so I created a model, form and view for mp3 file.

audio_transcoder/models.py:

import uuid

from django.db import models


def unique_file_path(instance, filename):
new_file_name = uuid.uuid4()
return str(new_file_name)

class AudioFile(models.Model):
name = models.CharField(max_length=100, blank=True)
mp3_file = models.FileField(upload_to=unique_file_path)
ogg_file = models.FileField(blank=True, upload_to=unique_file_path)
wav_file = models.FileField(blank=True, upload_to=unique_file_path)
ac3_file = models.FileField(blank=True, upload_to=unique_file_path)

def __str__(self):
return self.name

To avoid filename duplication of uploaded files I changed their names to be unique. the upload_to argument takes function unique_file_path which will generate unique name. This function has to take 2 arguments: instance and filename.

audio_transcoder/forms.py:

from django.forms import ModelForm
from .models import AudioFile


class AudioFileFrom(ModelForm):
class Meta:
model = AudioFile
fields = ['name', 'mp3_file']

Here I have used ModelForm which is the easiest way to generate the form for given model.

audio_transcoder/views.py:

from django.views.generic.edit import FormView
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse

from .forms import AudioFileFrom
from .models import AudioFile


class UploadAudioFileView(FormView):
template_name = 'upload.html'
form_class = AudioFileFrom


def form_valid(self, form):
audio_file = AudioFile(
name=self.get_form_kwargs().get('data')['name'],
mp3_file=self.get_form_kwargs().get('files')['mp3_file']
)
audio_file.save()

return HttpResponseRedirect(self.get_success_url())

def get_success_url(self):
return reverse('home')

And in UploadAudioFileView in form validation I took name and mp3_file from form submitted by user and save then in corresponding model. I have hard time figuring out where to put saving model. At first I wanted to work as generic CBV so no form_valid I pass additional argument to class: success_url but it didn't save audioFile. Also moving form_valid to AudioFileFrom didn't help. After some more research I found this GoDjango tutorial. If you have better way to do this please let me know!

The code that I have made so far is available on github.

In my application transcoding will be performed by ffmpeg. Why is that?

First of all, it's free & open source. So I don't have to pay to use it and I like using the open source tools. Second thing is that ffmpeg is more than enough for my task. It has a lot of features including recording, streaming and transcoding both video and audio.

So how to use it? For this project, the ffmpeg needs to be compiled with libmp3lame for mp3 files encoding, libvorbis for ogg format and enabled non-free for ac3. There is guide on ffmpeg site how to compile ffmpeg build. Also, make sure that you have cookies and tea with you while installation can take even 30 minutes.

After everything is setup all you need to do is:

$ ffmpeg -i mp3_file.mp3 wav_file.wav
$ ffmpeg -i mp3_file.mp3 ogg_file.ogg
$ ffmpeg -i mp3_file.mp3 ac3_file.ac3

The code above will transcode mp3 files into provided format. In my application one celery task will be called with subprocess, then another celery task will save the output of the first task to django model.

First: why we need Celery? Imagine that user upload mp3 file to the application and then in form validation the file is transcoded to other formats. The problem is that user will have to wait for the end of a task. So user sends a request. Then he waits for ffmpeg to transcode uploaded file to different format and then sends the response back. At first glance, it may look correct. But imagine that there are big files to transcode or there is a lot of formats to transcode. The user will have to wait a lot of time. To avoid this I will use celery task with rabbitmq broker to provide transcoding in the background.

So what is exactly celery? From the docs:

Celery is a simple, flexible and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

What does it mean? It is task manager that handle messages (tasks) in whenever form you like. Task is a function that could calculate something or handle some logic or maybe transcode files. But celery without message broker is useless. Celery support a lot of message brokers, but RabbitMq is supported out of the box so I will use this service. You may ask what is RabbitMq? It is broker- translates a message from the sender (django application) to reciever (celery).

So go to code:

First install rabbitmq with plugin that displays its status:

$ sudo apt-get install rabbitmq-server
$ sudo rabbitmq-plugins enable rabbitmq_management
$ sudo rabbitmqctl stop
$ sudo invoke-rc.d rabbitmq-server start

Add admin user to rabbitmq-server:

$ sudo rabbitmqctl add_user admin admin
$ sudo rabbitmqctl set_user_tags admin administrator
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

Right now you can go to localhost:15672 to see:

Rabbit mq panel
Rabbit mq panel

Now it's time to get celery working. First I will create new folder called taskapp where I will be putting my celery configuration:

├── audio_transcoder
├── taskapp
│ ├── celery.py
│ ├── __init__.py
│ └── tasks.py
# some other files ...

In celery.py are configs:

from __future__ import absolute_import

from celery import Celery

app = Celery('transcoder',
broker='amqp://admin:admin@localhost//',
include=['taskapp.tasks'])

if __name__ == '__main__':
app.start()

The first line is for backward compatibility with python2. In app configuration I specify: name of application- transcoder, url where broker will be listening with credentials-broker='amqp://admin:admin@localhost//' and files containing tasks-include=['taskapp.tasks'].

Then in tasks.py I added task itself:

from __future__ import absolute_import
import os
import os.path
import subprocess

from taskapp.celery import app
from audio_transcoder.models import AudioFile
import config.settings as settings

@app.task
def transcode_mp3(mp3_id):
audio_file = AudioFile.objects.get(id=mp3_id)
input_file_path = audio_file.mp3_file.path
filename = os.path.basename(input_file_path)

ogg_output_file_name = os.path.join('transcoded', '{}.ogg'.format(filename))
ogg_output_file_path = os.path.join(settings.MEDIA_ROOT, ogg_output_file_name)

ac3_output_file_name = os.path.join('transcoded', '{}.ac3'.format(filename))
ac3_output_file_path = os.path.join(settings.MEDIA_ROOT, ac3_output_file_name)

wav_output_file_name = os.path.join('transcoded', '{}.wav'.format(filename))
wav_output_file_path = os.path.join(settings.MEDIA_ROOT, wav_output_file_name)

if not os.path.isdir(os.path.dirname(ogg_output_file_path)):
os.makedirs(os.path.dirname(ogg_output_file_path))

subprocess.call([
settings.FFMPEG_PATH,
'-i',
input_file_path,
ogg_output_file_path,
ac3_output_file_path,
wav_output_file_path
]
)

audio_file.ogg_file = ogg_output_file_name
audio_file.ac3_file = ac3_output_file_name
audio_file.wav_file = wav_output_file_name
audio_file.save()

What I got there? Let's start with transcode_mp3 function. It has @app.task decorator to indicate for celery that it is its task. The argument is mp3_id. After getting the id of newly uploaded file this task gets audio_file model from the database and retrieve path to the uploaded mp3 file. Then it generated file names and paths for every format: ogg, wav and ac3. Right after it checks whenever folder transcoded under media is present. Calling subprocess is the same as calling ffmpeg -i mp3_file.mp3 ogg_file.ogg ac3_file.ac3 wav_file.wav. At the end task saves the paths to outputs to database.

The tasks itself is called in views:

from taskapp.tasks import transcode_mp3

class UploadAudioFileView(FormView):
# ...
def form_valid(self, form):
# ...
audio_file.save()
transcode_mp3.delay(audio_file.id)
# ...

Everything works as expected and I added redirection to detail view of audio file after successful upload. The problem is that transcode is not so fast as is redirection.

This and other bugs and small improvements will be fixed and added in last post of this series in next week. Thanks for reading! I really appreciate your feedback so please comment or write email. The code is available on this github repo.

Audio File detail view

The problem was after successful upload django redirect to detail view of uploaded file. And in HTML template of this view, it expects that ac3_file will be there but FFmpeg still is transcoding it. So I came up with solution:

1.First, I added new field to AudioFile model called was_processed to indicate whenever file has been processed:

class AudioFile(models.Model):
# ....
was_processed = models.BooleanField(default=False)

By default, this field has value False.

2.Then in my task I added signal handler that ran after every task:

from celery.signals import task_postrun

@task_postrun.connect
def task_executed_handler(sender=None, body=None, **kwargs):
audio_file = AudioFile.objects.get(id=kwargs['args'][0])
audio_file.was_processed = True
audio_file.save()

In this handler id of AudioFile object is taken from kwargs. After retrieving certain file from the database, the flag was_processed is set.

3.Lastly, in my audiofile_detail.html I added this code:


</ul>
</ul>

Logging

Right now everything works great but what if something goes wrong? To make sure that I will be able to find the issue I need logging. This is especially valuable for celery because Django doesn't show output from Celery as it is a different application. So to setup basic logging I need to add only a few things in tasks.py:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@app.task
def transcode_mp3(mp3_id):
# ...

logger.debug(
'Created output files: %s, %s, %s.',
ogg_output_file_path,
ac3_output_file_path,
wav_output_file_path
)

logger.info('Started transcoding.')
# transcoding here
logger.info('End of transcoding.')

# rest of program ...

Thanks to that I can see in my console:

[2016-03-19 09:55:07,184: INFO/Worker-4] taskapp.tasks.transcode_mp3[b6ca93d4-e58c-496f-b8e5-4ba493b8a92a]: Started transcoding.
# transcoding ...
[2016-03-19 09:55:11,837: INFO/Worker-4] taskapp.tasks.transcode_mp3[b6ca93d4-e58c-496f-b8e5-4ba493b8a92a]: End of transcoding.

Summary

I made basic transcoder application that uses FFmpeg, Django, Celery and RabbitMQ. I learned quite a bit about how celery works with rabbitmq and django. Thanks to that I stumbled upon some useful blog posts. I also see some issues with my solutions. For instance todays AudioFile detail view. I'm thinking about other ways to solve this problem because right now I need 2 operations on database for one file. Maybe you know solution to this? I'm really keen to hear about your view on this issue or other comments so feel free to write comments or send me an email! Thanks to all people who give me feedback- I really appreciate this! Code for this blog can be found on github.