Livro sobre C e Linux

Hoje decidi abrir uma exceção e postar esse link aqui. É um livro que escrevi há 9 anos e nunca foi revisado nem publicado, mas que ainda pode ser útil para muitos novos programadores.

 


cyclone ganha um site: cyclone.io

Há pouco mais de 2 anos atrás, o FriendFeed havia sido comprado pelo Facebook, e uma das coisas mais interessantes pra nós, na época, foi a publicação do web server que eles haviam desenvolvido para uso interno. Era Setembro de 2009.
Sua descrição permanece a mesma até hoje:

Tornado is an open source version of the scalable, non-blocking web server and tools that power FriendFeed. The FriendFeed application is written using a web framework that looks a bit like web.py or Google’s webapp, but with additional tools and optimizations to take advantage of the underlying non-blocking infrastructure.

Muita gente reclamou pelo fato do FriendFeed não ter procurado o pessoal do Twisted, pois desde os primórdios já era a lib padrão pra programar servidores assíncronos em Python. Ao invés disso, os caras do FriendFeed postaram coisas reclamando do Twisted – que era lento, mal documentado, etc; tudo que ainda reclamam nos dias de hoje. Veja aqui o anúncio do lançamento do Tornado, por Bret Taylor.

O fato é que o Tornado usava um IO loop próprio, e era bem mais rápido que a API nativa do Twisted pra web, Twisted Web. Além disso, o Tornado era baseado no framework que se chamava de anti-framework, web.py, por ser absurdamente simples. Mais ainda, o próprio Google havia publicado no mesmo ano o App Engine, que também era baseado no web.py. O resultado é que escrever um web service usando Tornado era muito mais interessante e obviamente mais simples que usar o Twisted Web.

Mas, como quase tudo, o Tornado tinha um grande problema: nenhum protocolo nativo do Python funcionava naquele IO loop. Nem mesmo coisas básicas como fazer uma query em DNS, ou acessar um PostgreSQL. O Twisted, já naquela época, tinha uma lista enorme de protocolos nativos, todos implementados pro IO loop do Twisted, e ainda mais protocolos que a própria lib padrão do Python. Protocolos como IRC, SSH, e suporte nativo pra qualquer database suportado pela DB API do Python, todos podiam ser mixados de maneira assíncrona no mesmo app, e coisas alucinantes podiam acontecer dentro de um web server.

Mas, justamente o protocolo HTTP era implementado pro Twisted Web, e o grupo de pessoas que cuidava disso havia criado o que seria um framework completo pra desenvolver aplicativos web, chamado Divmod. Por isso, eles não deram muita atenção pro Tornado, e pareciam focados nos componentes do Divmod que incluíam entre outros, até um sistema de banco de dados alternativo.

Dois dias depois do anúncio do Tornado, o glyph (cara que criou o Twisted) publicou um texto em seu blog dizendo o que ele gostaria que o Tornado fosse. Esse texto fez com que várias pessoas ficassem reclamando e chorando na teia, tipo esse post aqui.

Naquela época, eu estava desenvolvendo algumas coisas com Twisted Web, e obviamente fui testar o Tornado. No #twisted da freenode havia mais gente como eu, e o que mais havia progredido com algo interessante era um cara chamado Dustin Sallings. Ao invés de ficar chorando, ele havia portado o Tornado pra Twisted. Era exatamente o que deveria ter sido desde o começo. Em algumas conversas, ele havia dito que “estava pronto”.

Então, nasceu o cyclone. O motivo era simples: o Twisted Web era uma API pra criar web servers, e muito bagunçado pra criar web apps. O Tornado, era uma API muito decente, mas servia apenas pra criar web apps, sem suporte a nenhum outro protocolo além de HTTP. O cyclone, um mix dos dois: a API do Tornado pra criar web apps, mas que interagem com todos bancos de dados possíveis, e ainda todos protocolos do Twisted – incluindo os protocolos que se comunicam com sub-sistemas de telefonia.

Nos seus 2 anos de vida, o cyclone evoluiu muito. Muita gente contribuiu, mandou patch, arrumou bugs, adicionou coisas… e recebo alguns emails por semana de gente que usa e gosta, e agradece por ter algo como o cyclone disponível. Além de ser extremamente estável, passou a ter suporte nativo a sqlite, redis, mongodb, e uma gigantesca lista de features – tem até uma API semelhante à do bottle como opção. Coisas muito interessantes como o Nuswit, o RestMQ e o FreeGeoIP são baseados no cyclone.

Finalmente, peguei o domínio e o Gleicon comandou no site: cyclone.io


Puts! Só de pensar que já fiz crawler pra classificação do brasileirão… e agora o globoesporte.com tem isso: classificacao.json

Agora vou atrás do calendário compartilhado do Brasileirão 2011.


twisted adbapi: nomes de colunas em query

Quem já usou o twisted.enterprise.adbapi deve ter notado a falta de uma funcionalidade muitas vezes necessária na execução de queries (SELECT) no banco, seja ele qual for: colocar o nome das colunas no resultado, de forma que cada linha seja um dicionário e não um simples set.

Considerando que o adbapi é apenas um wrapper do Python Database API v2.0 (PEP-249), obviamente existem motivos pra essa funcionalidade não estar lá (além da preguiça de alguns). No documento, o primeiro item do FAQ:

Question: 

       How can I construct a dictionary out of the tuples returned by
       .fetch*():

    Answer:

       There are several existing tools available which provide
       helpers for this task. Most of them use the approach of using
       the column names defined in the cursor attribute .description
       as basis for the keys in the row dictionary.

       Note that the reason for not extending the DB API specification
       to also support dictionary return values for the .fetch*()
       methods is that this approach has several drawbacks:

       * Some databases don't support case-sensitive column names or
         auto-convert them to all lowercase or all uppercase
         characters.

       * Columns in the result set which are generated by the query
         (e.g.  using SQL functions) don't map to table column names
         and databases usually generate names for these columns in a
         very database specific way.

       As a result, accessing the columns through dictionary keys
       varies between databases and makes writing portable code
       impossible.

Na busca por uma solução, até encontrei um patch pro twisted, que adiciona um método runQueryMapped e retorna uma lista de dicionários, como eu queria. Porém, aplicar patch no twisted é furada, pois o código só funcionaria nas máquinas cujo twisted tem o tal patch. Fora de cogitação.

A solução mais simples (e tosca) que encontrei foi a que funcionou melhor:

# coding: utf-8
# hack for twisted.enterprise.adbapi.ConnectionPool class, providing
# a new method mapQuery (just like runQuery) whose return value
# is a list of dictionaries that map column names to values.

from twisted.enterprise import adbapi

class hackPool(adbapi.ConnectionPool):
    def _mapQuery(self, trans, *args, **kw):
        trans.execute(*args, **kw)
        rs, new_rs = trans.fetchall(), []
        names = [d[0] for d in trans.description]
        for values in rs:
            row = dict()
            for k, v in zip(names, values):
                row[k] = v
            new_rs.append(row)
        return new_rs

    def mapQuery(self, *args, **kw):
        return self.runInteraction(self._mapQuery, *args, **kw)

Dessa maneira, o hack fica no próprio código e não requer nenhum patch. Dá-lhe gambi.


yeah, tudo assíncrono!

Ultimamente tem sido tudo assim, assíncro. O Nuswit já vai fazer aniversário de 1 ano, e vale lembrar que está em produção contínua, sem dar nenhuma manutenção.

Já estou mais que convencido que o caminho pros próximos anos dessa década não pode ser outro, ainda mais com o WebSocket no w3c.

Para contribuir com a interwebs, tenho mantido os seguintes projetos:

http://github.com/fiorix/cyclone
Um clone do Tornado, webserver assíncrono do FriendFeed, que desde o ano passado é do Facebook. Essa implementação, batizada de Cyclone, tem algumas diferenças:

  • Core I/O baseado no Twisted
  • Suporte nativo a XMLRPC
  • Suporte a localização baseada no gettext – ao invés do CSV, original do Tornado

Com vários aplicativos de exemplos, todos os plug-ins do Tornado para autenticação no Google, Twitter, Facebook, OAuth, OpenID, etc…

O RestMQ (coisas do Gleicon, que ajudei a implementar) é baseado nele. A nova versão do Nuswit também será.

http://github.com/fiorix/twisted-twitter-stream
Uma API bem simples para acessar a Streaming API do Twitter. Provê suporte a todos os métodos publicados pela API.

Não depende do TwistedWeb, a implementação do HTTP 1.1 está inteira no código – na verdade, apenas o lado do client com suporte a Comet.

Permite criar sistemas como este.

http://github.com/fiorix/txredisapi
Um driver assíncrono pro Redis, também baseado no Twisted. O protocolo de comunicação já existia, mas era carente de algumas coisas, que implementei:

Além de estável, é muito rápido! Também foi usado no RestMQ, e aparentemente, está se tornando popular. Hoje achei algumas referências enquanto procurava no Google.

http://github.com/fiorix/mongo-async-python-driver
Outro driver de banco de dados, pro MongoDB. O driver original para Python é síncrono, o que dificulta (embora não impossibilita) de usar em sistemas assíncronos, especialmente baseados no Twisted.

Boa parte da implementação é baseada no pymongo original, inclusive o codec de BSON (em C), formato binário usado pelo Mongo, baseado em JSON.

Provavelmente se tornará o driver assíncrono oficial do Mongo para Python+Twisted, e está em vias de se tornar estável – isso devido às várias mudanças na API, e implementação de vários recursos incluindo Lazy Connections, e Document Reference.

Também, já tem algumas pessoas de olho no GitHub, acompanhando o desenvolvimento.

Entre os vários dbs nosql (couch, redis, etc) o Mongo é um dos mais completos, com uma linguagem de query muito decente, entre os vários outros recursos nativos. O fato de usar mmap para acessar os dados também faz com que ele seja muito rápido.


mongodb e twisted

Há algum tempo venho fazendo testes com o MongoDB pra casos específicos onde um RDBM tradicional como MySQL ou PostgreSQL não se encaixa muito bem.

Um dos casos onde um banco de dados baseado em objetos como o MongoDB se encaixa perfeitamente, é em um dos meus sistemas comerciais de telefonia, o Nuswit.

Lá, o usuário pode criar uma campanha de tele mensagem e colocar variáveis, que serão usadas para ligar para as pessoas e falar algumas coisas dinâmicas, sintetizando o texto em voz. Hoje, cada vez que o usuário cria uma campanha, pode importar uma planilha ou arquivo CSV, e então o sistema uma cria nova tabela no Sqlite com os campos que o usuário definiu na campanha, de acordo com essas tais variáveis.

Por isso, não é possível ter uma tabela estática, muito menos fazer relacionamentos pra usar a tabela no estilo chave=valor, pois essa mesma tabela é usada pros relatórios que o usuário baixa após o término da campanha.

Nesse caso, o MongoDB se encaixa perfeitamente. É muito mais simples criar uma coleção de dados com o mesmo nome da campanha, e importar documentos tipo JSON (nome=x, telefone=y, cpf=z) do que criar uma nova tabela com esses campos.

Além do mais, a API do pymongo é muito mais decente do que qualquer coisa parecida com SQL, pois os databases e collections são objetos do Python.

O único problema com o pymongo é que ele foi feito pra controlar e manter um pool de conexões com o banco, totalmente síncrono. Pra usar o pymongo em servidores como os que tenho feito ultimamente, assíncronos, baseados em Twisted, é necessário mandar todas as chamadas do banco pra um thread (usando callInThread ou deferToThread).

Pra solucionar esse problema, passei a frequentar o #mongodb na freenode, e em contato com o autor do pymongo, acabei criando uma versão assíncrona do driver, baseado em Twisted, que mantém o mesmo estilo da API original.

Agora, a integração entre Twisted e MongoDB está muito mais decente, usando pymonga.


mais sobre crawlers e spiders

logo_mercadolivreNo mês passado escrevi um artigo com um programa para capturar todos os items da primeira página de cada categoria do MercadoLivre.

Lá, lidava com alguns problemas como:

  • limite de concorrência no download das páginas
  • processamento de html em thread, síncrono
  • manter a maior parte do processo assíncrono, para ganhar tempo e CPU

Depois disso, precisei fazer umas alterações no código e acabei modificando um pouco programa, usando outras técnicas como:

Cada item desta lista corresponde aos itens da lista mais acima, respectivamente.

O esquema de cooperação do twisted é muito melhor que o Controller que havia criado anteriormente. Porém, muito mais complicado para jovens aprendizes. Recomendo este link para mais detalhes.

Sobre o processamento do html, vale a pena verificar o lxml. Antes, havia usado BeautifulSoup, que é muito bom, mas perde violentamente em desempenho e suporte a broken-html.

Por fim, o truque de usar generators para executar alguns callbacks inline é incrível, e absurdamente prático em casos como esse, do programa abaixo.

O resultado é final é o mesmo, mas a melhoria em desempenho é absurda. Fiz alguns testes na minha máquina e obtive o seguinte:

  • esta versão consome, em média, 20% menos de CPU
  • como não há necessidade de gravar os arquivos no disco, não consome disco
  • o processo todo ficou 657% mais rápido, simplesmente
  • ainda, o código é muito menor +_+

Veja ai:

#!/usr/bin/env python
# coding: utf-8

from lxml import html
from twisted.web import client
from twisted.python import log
from twisted.internet import task, defer, reactor

class MercadoLivre:
    def __str__(self):
        return 'http://www.mercadolivre.com.br/jm/ml.allcategs.AllCategsServlet'

    def parse_categories(self, content):
        category = subcategory = ''
        doc = html.fromstring(content)
        for link in doc.iterlinks():
            el, attr, href, offset = link
            try: category = el.find_class('categ')[0].text_content()
            except: pass
            else: continue
            if category:
                try: subcategory = el.find_class('seglnk')[0].text_content()
                except: continue
                else: yield (href, category, subcategory)

    def parse_subcategory(self, content):
        doc = html.fromstring(content)
        for element in doc.find_class('col_titulo'):
            yield element[0].text_content()

class Engine:
    def finish(self, result):
        reactor.stop()

    @defer.inlineCallbacks
    def fetch_categories(self, link, parser):
        try:
            doc = yield client.getPage(link)
            defer.returnValue(parser(doc))
        except Exception, e:
            print e

    def fetch_subcategory(self, links, parser, limit):
        coop = task.Cooperator()
        work = (client.getPage(link[0]).addCallback(parser).addCallback(self.page_items, *link) for link in links)
        result = defer.DeferredList([coop.coiterate(work) for x in xrange(limit)])
        result.addCallback(self.finish)
        result.addErrback(log.err)

    def page_items(self, items, href, category, subcategory):
        print 'Categoria: %s / %s' % (category.encode('utf-8'), subcategory.encode('utf-8'))
        for item in items: print ' -> %s' % item.encode('utf-8')
        print ''

def main(limit, *parsers):
    e = Engine()
    for parser in parsers:
        links = e.fetch_categories(str(parser), parser.parse_categories)
        links.addCallback(e.fetch_subcategory, parser.parse_subcategory, limit)

if __name__ == '__main__':
    reactor.callWhenRunning(main, 150, MercadoLivre())
    reactor.run()

autenticação no google, pro appengine

Google AppEngineAcabo de resolver um problema que vem me irritando muito nos últimos meses: sincronizar uma quande quantidade de dados com o appengine.

Depois de passar muita raiva com o bulkloader, que inevitavelmente acaba gerando esses Datastore Timeout, decidi fazer um outro esquema que funcionou muito melhor. Um dia coloco aqui, mas não agora.

O lance é que pra enviar os dados, habilitei o https e também autenticação. Assim, os blocos de dados podem ser sincronizados com segurança, apenas por um administrador da aplicação.

O problema foi fazer meu script se autenticar no Google Accounts. Apesar da API de autenticação do appengine, preferi não usá-la pois não há necessidade no meu caso, e optei por usar o formulário genérico do Google.

Encontrei este documento com alguns exemplos e dúvidas, mas não foi suficiente. Finalmente, entendi como o negócio funciona:

  • Faz um POST no serviço do Google Accounts enviando usuário, senha, nome de identificação e URL do app;
  • O Google Accounts irá fornecer um token, que então deve ser passado pro sistema de autenticação do próprio app (igual ao dev_appserver.py)
  • Nesse último request, o Google fornece um Cookie chamado ACSID, que deverá ser usado em todos as próximas requisições.

E pra resolver esse problema, escrevi uma classe chamada GoogleAuth, que recebe os dados necessários, se autentica no Google Accounts, e passa a fornecer o ACSID como string. Em caso de falha, tipo usuário ou senha errada, a classe gera um ValueError.

#!/usr/bin/env python
# coding: utf-8

import sys, urllib, urllib2

class GoogleAuth:
    cookie = ''

    # save the ACSID cookie before redirecting
    class RedirectHandler(urllib2.HTTPRedirectHandler):
        def __init__(self, klass): self.klass = klass
        def http_error_302(self, req, fp, code, msg, headers):
            try: self.klass.cookie = headers.get('Set-Cookie').split(';')[0]
            except: pass
            return urllib2.HTTPRedirectHandler.http_error_302(self, req, fp, code, msg, headers)

    # authenticate
    def __init__(self, appurl, appname, username, password):
        googauth = 'https://www.google.com/accounts/ClientLogin'

        # prepare the request data
        request_data = dict(
            Email=username,
            Passwd=password,
            source=appname,
            service='ah',
            accountType='HOSTED_OR_GOOGLE',
            )

        # get the token from google auth
        fd = urllib2.urlopen(googauth, urllib.urlencode(request_data))
        auth_dict = dict(x.split('=') for x in fd.read().split('\n') if x)
        fd.close()

        token = auth_dict.get('Auth')
        if auth_dict.get('Error') or not token:
            raise ValueError('authentication failed.')

        # get the ACSID cookie for further athenticated requests
        opener = urllib2.build_opener(self.RedirectHandler(self))
        fd = opener.open(appurl+'_ah/login?'+urllib.urlencode({'continue':appurl, 'auth':token}))
        fd.close()

    # return the cookie
    def __str__(self):
        return self.cookie

if __name__ == '__main__':
    appurl = 'http://myapp.appspot.com/'
    appname  = 'My App'
    username = 'foo'
    password = 'bar'

    # authenticate
    try: acsid = GoogleAuth(appurl, appname, username, password)
    except Exception, e:
        print str(e)
        sys.exit(1)

    # now access resources that require authentication...
    print 'authentication cookie:', acsid
    request = urllib2.Request(appurl, headers={'Cookie':acsid})
    chunk = urllib2.urlopen(request).read()

twisted crawler, alvo: mercadolivre

logo_mercadolivreJá pensou em fazer um programa que acessa o Mercado Livre, identifica o link de cada categoria, e extrai todos os produtos da primeira página de cada uma dessas categorias?

Pode até parecer complexo, mas não é. Esse programa existe, é simples, e está aqui, neste artigo, pronto pra você testar e modificar. :)

A algumas semanas tenho feito alguns crawlers pra esses sites que vendem produtos, como o Web Motors, Submarino, e até mesmo o eBay. A idéia é simplesmente extrair todos os produtos da primeira página de cada categoria, e montar uma base de dados com essas informações. O objetivo? Segredo de Estado.

Um dos problemas que encontrei ao fazer esse tipo de crawler, é controlar a concorrência de acesso. Se o programa acessa uma categoria por vez, demora uma eternidade pra baixar e processar cada página. Por outro lado, se ele acessa todas as categorias descontroladamente, o processo acaba criando muitos File Descriptors e isso causa diversos outros problemas pro sistema operacional – e não adianta falar em aumentar o limite usando ulimit -n, porque esses crawlers têm baixado milhões de links por dia.

A melhor maneira que encontrei pra solucionar esse problema, foi criando uma classe chamada “Controller”, que coloca as requisições em uma fila, baixa N páginas por vez, e ao invés de processá-las, simplesmente salva em um arquivo no disco.

Todos esses arquivos, com nomes únicos, são gerados usando um hash SHA1, e colocados em uma lista para serem processados offline, após o processo de download terminar.

O twisted tem um recurso muito interessante, que permite agrupar diversos deferreds em um único, e quando todos eles terminam, executa um callback. Nesse caso, quando todos os processos de baixar a página da subcategoria terminam, executa a função “offline”, que começa a processar cada uma, e extrair os produtos de lá.

Esta segunda etapa também poderia ser feita em paralelo, usando o twisted-parallels, mas decidi não colocar porque o código ficaria muito maior e você não teria tanta paciência pra entendê-lo. Usar um thread pool pra isso não valeria a pena, pois vale lembrar que o python usa o GIL, e I/O em thread só consome recurso e não aumenta em nada o desempenho.

A classe “MercadoLivre” tem 3 funções:

  • __str__: retorna o link com todas as categorias do Mercado Livre
  • parse_categories: um parser que retorna uma lista composta por tuples de (url, nome)
  • parse_subcategory: um parser pro conteúdo de cada categoria, que retorna uma lista com o título dos produtos anunciados lá

Todos esses parsers são baseados no BeautifulSoup 3.1. É necessário tê-lo instalado pra usar o programa.

Por fim, para usar este programa em outros sites, basta substituir a classe “MercadoLivre” pela sua própria, tipo “Submarino”.

Detalhe: essa porcaria de WordPress mostra a identação do código errada no artigo, mas se você clicar em “view plain”, poderá copiar e colar o código correto. Chame o suporte!

Aqui o código (muito belo, por sinal…):

#!/usr/bin/env python
# coding: utf-8

import os, re, sys
import shutil, hashlib
from Queue import Queue
from BeautifulSoup import BeautifulSoup

# twisted
from twisted.web import client
from twisted.internet import defer, threads, reactor

class MercadoLivre:
noscript = re.compile(r”(?is)]*>(.*?)“)

def __str__(self):
return ‘http://www.mercadolivre.com.br/jm/ml.allcategs.AllCategsServlet’

def parse_categories(self, content):
catlist = {}
cleanup = lambda s: s.replace(‘\n’, ”).strip()

# parse the document
soup = BeautifulSoup(content)

# find all categories and their items
current = ”
for item in soup.findAll(‘a’, {‘class’:[‘categ’,’seglnk’]}):
text = cleanup(item.contents[0])
attrs = dict(item.attrs)
if attrs.get(u’class’) == u’categ’:
current = text
catlist[current] = []
else:
catlist[current].append((attrs.get(u’href’, ”), text))

# return the list of categories and their items
return catlist

def parse_subcategory(self, content):
result = []
soup = BeautifulSoup(self.noscript.sub(”, content),
convertEntities=BeautifulSoup.HTML_ENTITIES)

for item in soup.findAll(‘div’, {‘class’:’col_titulo’}):
result.append(item.find(‘a’).contents[0].strip())

return result

class Controller:
def __init__(self, fetch):
self.count = 0
self.limit = fetch
self.queue = Queue()
self.tmpdir = ‘/tmp/mercadolivre.%d’ % os.getpid()
self.dispatch()

def encode(self, text):
return unicode(text).encode(‘utf-8’, ‘replace’)

def getPage(self, url, *args, **kwargs):
d = defer.Deferred()
self.queue.put((d, url, args, kwargs))
return d

def dispatch(self):
while True:
try:
assert self.count < self.limit d, url, args, kwargs = self.queue.get_nowait() except: break self.count += 1 deferred = client.getPage(url, *args, **kwargs) deferred.addBoth(self.decrease_count) deferred.chainDeferred(d) reactor.callLater(1, self.dispatch) def decrease_count(self, result): self.count -= 1 return result class main(Controller): def __init__(self, fetch, parser): # set the concurrent download limit for the crawler Controller.__init__(self, fetch) # fetch the main categories page self.total = 0 self.files = [] self.parser = parser d = self.getPage(str(parser), timeout=60) d.addCallback(self.fetch_categories) d.addErrback(self.error_categories) def error_categories(self, error): # hmmm... fatal error, cannot continue print 'cannot fetch categories from %s: %s' % (str(self.parser), str(error)) reactor.stop() def error_subcategory(self, error, href, category, subcategory): # problem fetching subcategory contents... print 'error "%s / %s": [%s] %s' % (category, subcategory, href, error.value) def fetch_categories(self, content): # parse the contents in a thread reactor.callInThread(self.parse_categories, content) def parse_categories(self, content): try: categories = self.parser.parse_categories(content) except Exception, e: print 'error parsing categories: %s' % str(e) reactor.stop() return print 'going to fetch %d categories...' % len(categories) tasks = [] for category, contents in categories.items(): category = self.encode(category) for href, subcategory in contents: href, subcategory = self.encode(href), self.encode(subcategory) d = self.getPage(href, timeout=60) d.addCallback(self.save_subcategory, href, category, subcategory) d.addErrback(self.error_subcategory, href, category, subcategory) tasks.append(d) # call the offline subcategory parser after downloading everything... d = defer.gatherResults(tasks) d.addCallback(self.offline) def save_subcategory(self, contents, href, category, subcategory): # create the tmpdir if it doesn't exist if not os.path.exists(self.tmpdir): os.mkdir(self.tmpdir) # create a unique hash for each category hash = hashlib.new('sha1', category+subcategory).hexdigest() filename = os.path.join(self.tmpdir, hash+'.dump') try: fd = open(filename, 'w') fd.write(contents) fd.close() except: return self.files.append((href, category, subcategory, filename)) print 'saving "%s / %s": %s' % (category, subcategory, hash) def offline(self, null): # start processing each subcategory... reactor.stop() for item in self.files: href, category, subcategory, filename = item sys.stdout.write('parsing "%s / %s": ' % (category, subcategory)) sys.stdout.flush() fd = open(filename) try: results = self.parser.parse_subcategory(fd.read()) except Exception, e: print 'error! %s' % str(e) continue lr = len(results) self.total += lr print '%d items' % lr for result in results: print ' ' + self.encode(result) print '\n%d items processed. cleaning up!' % self.total shutil.rmtree(self.tmpdir) if __name__ == '__main__': reactor.callWhenRunning(main, 10, MercadoLivre()) reactor.run() [/sourcecode] Agora, é só você se dedicar e fazer isso pros sites que te interessam. Notas?


paralelismo: python e twisted

parent-childExecutar algumas tarefas em paralelo no python, usando threads, pode ser problemático, especialmente quando há I/O envolvido.

Apesar do interpretador do python usar as threads nativas do sistema operacional (na maioria dos *nix, pthreads), ele não é 100% thread-safe. Isso não é um bug, mas sim um recurso usado para evitar travamento ou computação incorreta de valores.

Do texto original:

The Python interpreter is not fully thread safe. In order to support multi-threaded Python programs, there’s a global lock, called the global interpreter lock or GIL, that must be held by the current thread before it can safely access Python objects. Without the lock, even the simplest operations could cause problems in a multi-threaded program: for example, when two threads simultaneously increment the reference count of the same object, the reference count could end up being incremented only once instead of twice.

Therefore, the rule exists that only the thread that has acquired the global interpreter lock may operate on Python objects or call Python/C API functions. In order to support multi-threaded Python programs, the interpreter regularly releases and reacquires the lock — by default, every 100 bytecode instructions (this can be changed with sys.setcheckinterval()). The lock is also released and reacquired around potentially blocking I/O operations like reading or writing a file, so that other threads can run while the thread that requests the I/O is waiting for the I/O operation to complete.

Do outro lado, temos o Twisted. Para mim, o Twisted é uma super biblioteca de I/O, com todos os recursos necessários para tratar tarefas de modo assíncrono (non-blocking), considerando ainda aquelas que bloqueiam a execução por determinado tempo (blocking).

Internamente, o Twisted mantém um thread pool para executar as tarefas síncronas, como por exemplo executar um INSERT em banco de dados. Para tal, existem as funções threads.deferToThread e reactor.callInThread.

Porém, se considerarmos o GIL, mencionado acima, temos como resultado um belo problema: enquanto uma tarefa que necessita I/O síncrono está sendo executada em uma thread, o interpretador do python fica bloqueado naquela operação e todo o resto fica parado. Isso causa uma perda de desempenho sem tamanho, e diversos outros efeitos colaterais (como time-out em sockets, etc).

Solucionar o problema de paralelismo no python não é tão complicado, mas também não é tão simples. Existem dois módulos que conheço, que fazem isso: pyprocessing e multiprocessing.

Ambos usam uma API similar à do módulo threading, mas ao invés de threads, criam processos usando fork(), que por sua vez, executam outro interpretador do python e se livram dos efeitos do GIL em um único processo. Mas, quando usados em conjunto com o Twisted, necessitam diversas adaptações para a comunicação entre os processos pai e filhos, pois essa comunicação é feita através de pipes.

Com tudo isso junto, o GIL começa a se tornar um problema e a coisa toda já se parece com uma grande confusão, que para muitos já parece não ter solução decente. Em suma, é o seguinte: escrever programas que necessitam parelelismo em python exige usar pyprocessing ou multiprocessing, mas quando o programa é inteiro assíncrono usando Twisted, tudo fica complicado.

Isso obviamente aconteceu comigo em um sistema relativamente grande, que precisava executar a classificação de alguns dados em paralelo, e quando usava o thread pool do Twisted, o processo inteiro ficava lento devido à grande quantidade de I/O para ler e gravar arquivos no disco. Era o GIL me atrapalhando.

Para solucionar esse problema, escrevi um módulo que usa a função spawnProcess do próprio Twisted, e automaticamente trata do pipe entre os processos pai e filho. Ainda, escrevi um protocolo de comunicação entre eles que permite transmitir e receber dados entre os processos de modo transparente, de maneira assíncrona.

Ainda, esse módulo possui uma classe que cria um pool de processos (não threads) para onde é possível despachar dados para serem processados em paralelo, e aguardar pelo resultado em um deferred, seguindo todo o padrão do Twisted.

Depois de executar todos os testes necessários e deixar o código estável, implementei isso no meu sistema e resolvi todo o problema do paralelismo de maneira simples e elegante, sem nenhuma gambiarra nem esquisitisse.

Denominado Twisted-Parallels, o módulo foi liberado sob a GPL v2 e está disponível no Google Code, com alguns exemplos de utilização.