Django + Celery & Rabbit
import BlogPostImage from “~components/BlogPostImage.astro”;
Hello, today post is first one in series about Celery in Django application and how to use it.
What you gain after reading such series?
How to integrate celery + rabbitmq in basic Django application.
So what will be our basic application?
It is planned to be web service where you can upload mp3 file and then have them transcoded into ogg, wav and ac3 files.
Recently I started reading an excellent book called Two Scoops of Django: Best Practices for Django 1.8. The book is about what to do and what not to do for your Django project. I feel some opportunity to try advice from the book in real project. I will start from project layout: normally you have something like this:
$ tree trancoder
trancoder
├── audiotranscoder
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tests.py
│ └── views.py
└── trancoder
├── manage.py
└── trancoder
├── __init__.py
├── settings.py
├── urls.py
└── wsgi.py
But in the book authors suggest that it can be changed to something like this:
$ tree transcoder
transcoder
├── README.rst
├── requirements.txt
└── transcoder
├── audio_transcoder
│ ├── admin.py
│ ├── apps.py
│ ├── forms.py
│ ├── __init__.py
│ ├── migrations
│ │ ├── 0001_initial.py
│ │ └── __init__.py
│ ├── models.py
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── config
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── manage.py
├── media
└── templates
├── base.html
├── home.html
└── upload.html
After working some time with this layout I have to say that it’s very responsive and good structured in my opinion. But there is need for changes in 2 files for Django to be able recognizing where to look up for settings and WSGI modules:
manage.py
:
#!/usr/bin/env python
import os
import sys
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)
settings.py
:
# rest of settings file
ROOT_URLCONF = 'config.urls'
WSGI_APPLICATION = 'config.wsgi.application'
MEDIA_ROOT = os.path.join(BASE_DIR, 'media')
MEDIA_URL = '/media/'
The transcoder need file to transcode so I created a model, form and view for mp3 file.
audio_transcoder/models.py
:
import uuid
from django.db import models
def unique_file_path(instance, filename):
new_file_name = uuid.uuid4()
return str(new_file_name)
class AudioFile(models.Model):
name = models.CharField(max_length=100, blank=True)
mp3_file = models.FileField(upload_to=unique_file_path)
ogg_file = models.FileField(blank=True, upload_to=unique_file_path)
wav_file = models.FileField(blank=True, upload_to=unique_file_path)
ac3_file = models.FileField(blank=True, upload_to=unique_file_path)
def __str__(self):
return self.name
To avoid filename duplication of uploaded files I changed their names to
be unique. the upload_to
argument takes function unique_file_path
which will generate unique name. This function has to take 2 arguments:
instance
and filename
.
audio_transcoder/forms.py
:
from django.forms import ModelForm
from .models import AudioFile
class AudioFileFrom(ModelForm):
class Meta:
model = AudioFile
fields = ['name', 'mp3_file']
Here I have used ModelForm
which is the easiest way to generate the
form for given model.
audio_transcoder/views.py
:
from django.views.generic.edit import FormView
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from .forms import AudioFileFrom
from .models import AudioFile
class UploadAudioFileView(FormView):
template_name = 'upload.html'
form_class = AudioFileFrom
def form_valid(self, form):
audio_file = AudioFile(
name=self.get_form_kwargs().get('data')['name'],
mp3_file=self.get_form_kwargs().get('files')['mp3_file']
)
audio_file.save()
return HttpResponseRedirect(self.get_success_url())
def get_success_url(self):
return reverse('home')
And in UploadAudioFileView
in form validation I took name
and
mp3_file
from form submitted by user and save then in corresponding
model. I have hard time figuring out where to put saving model. At first
I wanted to work as generic CBV so no form_valid
I pass
additional argument to class: success_url
but it didn’t save
audioFile
. Also moving form_valid
to AudioFileFrom
didn’t help.
After some more research I found this GoDjango
tutorial. If you have better way
to do this please let me know!
The code that I have made so far is available on github.
In my application transcoding will be performed by ffmpeg. Why is that?
First of all, it’s free & open source. So I don’t have to pay to use it and I like using the open source tools. Second thing is that ffmpeg is more than enough for my task. It has a lot of features including recording, streaming and transcoding both video and audio.
So how to use it? For this project, the ffmpeg needs to be compiled with libmp3lame for mp3 files encoding, libvorbis for ogg format and enabled non-free for ac3. There is guide on ffmpeg site how to compile ffmpeg build. Also, make sure that you have cookies and tea with you while installation can take even 30 minutes.
After everything is setup all you need to do is:
$ ffmpeg -i mp3_file.mp3 wav_file.wav
$ ffmpeg -i mp3_file.mp3 ogg_file.ogg
$ ffmpeg -i mp3_file.mp3 ac3_file.ac3
The code above will transcode mp3 files into provided format. In my application one celery task will be called with subprocess, then another celery task will save the output of the first task to django model.
First: why we need Celery? Imagine that user upload mp3 file to the application and then in form validation the file is transcoded to other formats. The problem is that user will have to wait for the end of a task. So user sends a request. Then he waits for ffmpeg to transcode uploaded file to different format and then sends the response back. At first glance, it may look correct. But imagine that there are big files to transcode or there is a lot of formats to transcode. The user will have to wait a lot of time. To avoid this I will use celery task with rabbitmq broker to provide transcoding in the background.
So what is exactly celery? From the docs:
Celery is a simple, flexible and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.
What does it mean? It is task manager that handle messages (tasks) in whenever form you like. Task is a function that could calculate something or handle some logic or maybe transcode files. But celery without message broker is useless. Celery support a lot of message brokers, but RabbitMq is supported out of the box so I will use this service. You may ask what is RabbitMq? It is broker- translates a message from the sender (django application) to reciever (celery).
So go to code:
First install rabbitmq with plugin that displays its status:
$ sudo apt-get install rabbitmq-server
$ sudo rabbitmq-plugins enable rabbitmq_management
$ sudo rabbitmqctl stop
$ sudo invoke-rc.d rabbitmq-server start
Add admin user to rabbitmq-server:
$ sudo rabbitmqctl add_user admin admin
$ sudo rabbitmqctl set_user_tags admin administrator
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
Right now you can go to localhost:15672
to see:
Now it’s time to get celery working. First I will create new folder
called taskapp
where I will be putting my celery configuration:
├── audio_transcoder
├── taskapp
│ ├── celery.py
│ ├── __init__.py
│ └── tasks.py
│ # some other files ...
In celery.py
are configs:
from __future__ import absolute_import
from celery import Celery
app = Celery('transcoder',
broker='amqp://admin:admin@localhost//',
include=['taskapp.tasks'])
if __name__ == '__main__':
app.start()
The first line is for backward compatibility with python2. In app
configuration I specify: name of application- transcoder
, url where
broker will be listening with
credentials-broker='amqp://admin:admin@localhost//'
and files
containing tasks-include=['taskapp.tasks']
.
Then in tasks.py
I added task itself:
from __future__ import absolute_import
import os
import os.path
import subprocess
from taskapp.celery import app
from audio_transcoder.models import AudioFile
import config.settings as settings
@app.task
def transcode_mp3(mp3_id):
audio_file = AudioFile.objects.get(id=mp3_id)
input_file_path = audio_file.mp3_file.path
filename = os.path.basename(input_file_path)
ogg_output_file_name = os.path.join('transcoded', '{}.ogg'.format(filename))
ogg_output_file_path = os.path.join(settings.MEDIA_ROOT, ogg_output_file_name)
ac3_output_file_name = os.path.join('transcoded', '{}.ac3'.format(filename))
ac3_output_file_path = os.path.join(settings.MEDIA_ROOT, ac3_output_file_name)
wav_output_file_name = os.path.join('transcoded', '{}.wav'.format(filename))
wav_output_file_path = os.path.join(settings.MEDIA_ROOT, wav_output_file_name)
if not os.path.isdir(os.path.dirname(ogg_output_file_path)):
os.makedirs(os.path.dirname(ogg_output_file_path))
subprocess.call([
settings.FFMPEG_PATH,
'-i',
input_file_path,
ogg_output_file_path,
ac3_output_file_path,
wav_output_file_path
]
)
audio_file.ogg_file = ogg_output_file_name
audio_file.ac3_file = ac3_output_file_name
audio_file.wav_file = wav_output_file_name
audio_file.save()
What I got there? Let’s start with transcode_mp3
function. It has
@app.task
decorator to indicate for celery that it is its task. The
argument is mp3_id
. After getting the id of newly uploaded file this
task gets audio_file
model from the database and retrieve path to the
uploaded mp3 file. Then it generated file names and paths for every
format: ogg, wav and ac3. Right after it checks whenever folder
transcoded
under media
is present. Calling subprocess
is
the same as calling
ffmpeg -i mp3_file.mp3 ogg_file.ogg ac3_file.ac3 wav_file.wav
. At the
end task saves the paths to outputs to database.
The tasks itself is called in views:
from taskapp.tasks import transcode_mp3
class UploadAudioFileView(FormView):
# ...
def form_valid(self, form):
# ...
audio_file.save()
transcode_mp3.delay(audio_file.id)
# ...
Everything works as expected and I added redirection to detail view of audio file after successful upload. The problem is that transcode is not so fast as is redirection.
This and other bugs and small improvements will be fixed and added in last post of this series in next week. Thanks for reading! I really appreciate your feedback so please comment or write email. The code is available on this github repo.
Audio File detail view
The problem was after successful upload django redirect to detail view
of uploaded file. And in HTML template of this view, it expects that
ac3_file
will be there but FFmpeg still is transcoding it. So I came up
with solution:
1.First, I added new field to AudioFile
model called was_processed
to indicate whenever file has been processed:
class AudioFile(models.Model):
# ....
was_processed = models.BooleanField(default=False)
By default, this field has value False
.
2.Then in my task I added signal handler that ran after every task:
from celery.signals import task_postrun
@task_postrun.connect
def task_executed_handler(sender=None, body=None, **kwargs):
audio_file = AudioFile.objects.get(id=kwargs['args'][0])
audio_file.was_processed = True
audio_file.save()
In this handler id of AudioFile
object is taken from kwargs
. After
retrieving certain file from the database, the flag was_processed
is
set.
3.Lastly, in my audiofile_detail.html
I added this code:
{% if object.was_processed %}
<ul>
<a href="{{ object.ac3_file.url}}">Ac3 File</a>
</ul>
<ul>
<a href="{{ object.ogg_file.url}}">Ogg File</a>
</ul>
<ul>
<a href="{{ object.wav_file.url}}">Wav File</a>
<ul>
{% endif %}
</ul>
</ul>
Logging
Right now everything works great but what if something goes wrong? To
make sure that I will be able to find the issue I need logging. This is
especially valuable for celery because Django doesn’t show output from
Celery as it is a different application. So to setup basic logging I
need to add only a few things in tasks.py
:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def transcode_mp3(mp3_id):
# ...
logger.debug(
'Created output files: %s, %s, %s.',
ogg_output_file_path,
ac3_output_file_path,
wav_output_file_path
)
logger.info('Started transcoding.')
# transcoding here
logger.info('End of transcoding.')
# rest of program ...
Thanks to that I can see in my console:
[2016-03-19 09:55:07,184: INFO/Worker-4] taskapp.tasks.transcode_mp3[b6ca93d4-e58c-496f-b8e5-4ba493b8a92a]: Started transcoding.
# transcoding ...
[2016-03-19 09:55:11,837: INFO/Worker-4] taskapp.tasks.transcode_mp3[b6ca93d4-e58c-496f-b8e5-4ba493b8a92a]: End of transcoding.
Summary
I made basic transcoder application that uses FFmpeg, Django, Celery and RabbitMQ. I learned quite a bit about how celery works with rabbitmq and django. Thanks to that I stumbled upon some useful blog posts. I also see some issues with my solutions. For instance todays AudioFile detail view. I’m thinking about other ways to solve this problem because right now I need 2 operations on database for one file. Maybe you know solution to this? I’m really keen to hear about your view on this issue or other comments so feel free to write comments or send me an email! Thanks to all people who give me feedback- I really appreciate this! Code for this blog can be found on github.