yeah, tudo assíncrono!

Ultimamente tem sido tudo assim, assíncro. O Nuswit já vai fazer aniversário de 1 ano, e vale lembrar que está em produção contínua, sem dar nenhuma manutenção.

Já estou mais que convencido que o caminho pros próximos anos dessa década não pode ser outro, ainda mais com o WebSocket no w3c.

Para contribuir com a interwebs, tenho mantido os seguintes projetos:

http://github.com/fiorix/cyclone
Um clone do Tornado, webserver assíncrono do FriendFeed, que desde o ano passado é do Facebook. Essa implementação, batizada de Cyclone, tem algumas diferenças:

  • Core I/O baseado no Twisted
  • Suporte nativo a XMLRPC
  • Suporte a localização baseada no gettext – ao invés do CSV, original do Tornado

Com vários aplicativos de exemplos, todos os plug-ins do Tornado para autenticação no Google, Twitter, Facebook, OAuth, OpenID, etc…

O RestMQ (coisas do Gleicon, que ajudei a implementar) é baseado nele. A nova versão do Nuswit também será.

http://github.com/fiorix/twisted-twitter-stream
Uma API bem simples para acessar a Streaming API do Twitter. Provê suporte a todos os métodos publicados pela API.

Não depende do TwistedWeb, a implementação do HTTP 1.1 está inteira no código – na verdade, apenas o lado do client com suporte a Comet.

Permite criar sistemas como este.

http://github.com/fiorix/txredisapi
Um driver assíncrono pro Redis, também baseado no Twisted. O protocolo de comunicação já existia, mas era carente de algumas coisas, que implementei:

Além de estável, é muito rápido! Também foi usado no RestMQ, e aparentemente, está se tornando popular. Hoje achei algumas referências enquanto procurava no Google.

http://github.com/fiorix/mongo-async-python-driver
Outro driver de banco de dados, pro MongoDB. O driver original para Python é síncrono, o que dificulta (embora não impossibilita) de usar em sistemas assíncronos, especialmente baseados no Twisted.

Boa parte da implementação é baseada no pymongo original, inclusive o codec de BSON (em C), formato binário usado pelo Mongo, baseado em JSON.

Provavelmente se tornará o driver assíncrono oficial do Mongo para Python+Twisted, e está em vias de se tornar estável – isso devido às várias mudanças na API, e implementação de vários recursos incluindo Lazy Connections, e Document Reference.

Também, já tem algumas pessoas de olho no GitHub, acompanhando o desenvolvimento.

Entre os vários dbs nosql (couch, redis, etc) o Mongo é um dos mais completos, com uma linguagem de query muito decente, entre os vários outros recursos nativos. O fato de usar mmap para acessar os dados também faz com que ele seja muito rápido.

Anúncios

mongodb e twisted

Há algum tempo venho fazendo testes com o MongoDB pra casos específicos onde um RDBM tradicional como MySQL ou PostgreSQL não se encaixa muito bem.

Um dos casos onde um banco de dados baseado em objetos como o MongoDB se encaixa perfeitamente, é em um dos meus sistemas comerciais de telefonia, o Nuswit.

Lá, o usuário pode criar uma campanha de tele mensagem e colocar variáveis, que serão usadas para ligar para as pessoas e falar algumas coisas dinâmicas, sintetizando o texto em voz. Hoje, cada vez que o usuário cria uma campanha, pode importar uma planilha ou arquivo CSV, e então o sistema uma cria nova tabela no Sqlite com os campos que o usuário definiu na campanha, de acordo com essas tais variáveis.

Por isso, não é possível ter uma tabela estática, muito menos fazer relacionamentos pra usar a tabela no estilo chave=valor, pois essa mesma tabela é usada pros relatórios que o usuário baixa após o término da campanha.

Nesse caso, o MongoDB se encaixa perfeitamente. É muito mais simples criar uma coleção de dados com o mesmo nome da campanha, e importar documentos tipo JSON (nome=x, telefone=y, cpf=z) do que criar uma nova tabela com esses campos.

Além do mais, a API do pymongo é muito mais decente do que qualquer coisa parecida com SQL, pois os databases e collections são objetos do Python.

O único problema com o pymongo é que ele foi feito pra controlar e manter um pool de conexões com o banco, totalmente síncrono. Pra usar o pymongo em servidores como os que tenho feito ultimamente, assíncronos, baseados em Twisted, é necessário mandar todas as chamadas do banco pra um thread (usando callInThread ou deferToThread).

Pra solucionar esse problema, passei a frequentar o #mongodb na freenode, e em contato com o autor do pymongo, acabei criando uma versão assíncrona do driver, baseado em Twisted, que mantém o mesmo estilo da API original.

Agora, a integração entre Twisted e MongoDB está muito mais decente, usando pymonga.


mais sobre crawlers e spiders

logo_mercadolivreNo mês passado escrevi um artigo com um programa para capturar todos os items da primeira página de cada categoria do MercadoLivre.

Lá, lidava com alguns problemas como:

  • limite de concorrência no download das páginas
  • processamento de html em thread, síncrono
  • manter a maior parte do processo assíncrono, para ganhar tempo e CPU

Depois disso, precisei fazer umas alterações no código e acabei modificando um pouco programa, usando outras técnicas como:

Cada item desta lista corresponde aos itens da lista mais acima, respectivamente.

O esquema de cooperação do twisted é muito melhor que o Controller que havia criado anteriormente. Porém, muito mais complicado para jovens aprendizes. Recomendo este link para mais detalhes.

Sobre o processamento do html, vale a pena verificar o lxml. Antes, havia usado BeautifulSoup, que é muito bom, mas perde violentamente em desempenho e suporte a broken-html.

Por fim, o truque de usar generators para executar alguns callbacks inline é incrível, e absurdamente prático em casos como esse, do programa abaixo.

O resultado é final é o mesmo, mas a melhoria em desempenho é absurda. Fiz alguns testes na minha máquina e obtive o seguinte:

  • esta versão consome, em média, 20% menos de CPU
  • como não há necessidade de gravar os arquivos no disco, não consome disco
  • o processo todo ficou 657% mais rápido, simplesmente
  • ainda, o código é muito menor +_+

Veja ai:

#!/usr/bin/env python
# coding: utf-8

from lxml import html
from twisted.web import client
from twisted.python import log
from twisted.internet import task, defer, reactor

class MercadoLivre:
    def __str__(self):
        return 'http://www.mercadolivre.com.br/jm/ml.allcategs.AllCategsServlet'

    def parse_categories(self, content):
        category = subcategory = ''
        doc = html.fromstring(content)
        for link in doc.iterlinks():
            el, attr, href, offset = link
            try: category = el.find_class('categ')[0].text_content()
            except: pass
            else: continue
            if category:
                try: subcategory = el.find_class('seglnk')[0].text_content()
                except: continue
                else: yield (href, category, subcategory)

    def parse_subcategory(self, content):
        doc = html.fromstring(content)
        for element in doc.find_class('col_titulo'):
            yield element[0].text_content()

class Engine:
    def finish(self, result):
        reactor.stop()

    @defer.inlineCallbacks
    def fetch_categories(self, link, parser):
        try:
            doc = yield client.getPage(link)
            defer.returnValue(parser(doc))
        except Exception, e:
            print e

    def fetch_subcategory(self, links, parser, limit):
        coop = task.Cooperator()
        work = (client.getPage(link[0]).addCallback(parser).addCallback(self.page_items, *link) for link in links)
        result = defer.DeferredList([coop.coiterate(work) for x in xrange(limit)])
        result.addCallback(self.finish)
        result.addErrback(log.err)

    def page_items(self, items, href, category, subcategory):
        print 'Categoria: %s / %s' % (category.encode('utf-8'), subcategory.encode('utf-8'))
        for item in items: print ' -> %s' % item.encode('utf-8')
        print ''

def main(limit, *parsers):
    e = Engine()
    for parser in parsers:
        links = e.fetch_categories(str(parser), parser.parse_categories)
        links.addCallback(e.fetch_subcategory, parser.parse_subcategory, limit)

if __name__ == '__main__':
    reactor.callWhenRunning(main, 150, MercadoLivre())
    reactor.run()

autenticação no google, pro appengine

Google AppEngineAcabo de resolver um problema que vem me irritando muito nos últimos meses: sincronizar uma quande quantidade de dados com o appengine.

Depois de passar muita raiva com o bulkloader, que inevitavelmente acaba gerando esses Datastore Timeout, decidi fazer um outro esquema que funcionou muito melhor. Um dia coloco aqui, mas não agora.

O lance é que pra enviar os dados, habilitei o https e também autenticação. Assim, os blocos de dados podem ser sincronizados com segurança, apenas por um administrador da aplicação.

O problema foi fazer meu script se autenticar no Google Accounts. Apesar da API de autenticação do appengine, preferi não usá-la pois não há necessidade no meu caso, e optei por usar o formulário genérico do Google.

Encontrei este documento com alguns exemplos e dúvidas, mas não foi suficiente. Finalmente, entendi como o negócio funciona:

  • Faz um POST no serviço do Google Accounts enviando usuário, senha, nome de identificação e URL do app;
  • O Google Accounts irá fornecer um token, que então deve ser passado pro sistema de autenticação do próprio app (igual ao dev_appserver.py)
  • Nesse último request, o Google fornece um Cookie chamado ACSID, que deverá ser usado em todos as próximas requisições.

E pra resolver esse problema, escrevi uma classe chamada GoogleAuth, que recebe os dados necessários, se autentica no Google Accounts, e passa a fornecer o ACSID como string. Em caso de falha, tipo usuário ou senha errada, a classe gera um ValueError.

#!/usr/bin/env python
# coding: utf-8

import sys, urllib, urllib2

class GoogleAuth:
    cookie = ''

    # save the ACSID cookie before redirecting
    class RedirectHandler(urllib2.HTTPRedirectHandler):
        def __init__(self, klass): self.klass = klass
        def http_error_302(self, req, fp, code, msg, headers):
            try: self.klass.cookie = headers.get('Set-Cookie').split(';')[0]
            except: pass
            return urllib2.HTTPRedirectHandler.http_error_302(self, req, fp, code, msg, headers)

    # authenticate
    def __init__(self, appurl, appname, username, password):
        googauth = 'https://www.google.com/accounts/ClientLogin'

        # prepare the request data
        request_data = dict(
            Email=username,
            Passwd=password,
            source=appname,
            service='ah',
            accountType='HOSTED_OR_GOOGLE',
            )

        # get the token from google auth
        fd = urllib2.urlopen(googauth, urllib.urlencode(request_data))
        auth_dict = dict(x.split('=') for x in fd.read().split('\n') if x)
        fd.close()

        token = auth_dict.get('Auth')
        if auth_dict.get('Error') or not token:
            raise ValueError('authentication failed.')

        # get the ACSID cookie for further athenticated requests
        opener = urllib2.build_opener(self.RedirectHandler(self))
        fd = opener.open(appurl+'_ah/login?'+urllib.urlencode({'continue':appurl, 'auth':token}))
        fd.close()

    # return the cookie
    def __str__(self):
        return self.cookie

if __name__ == '__main__':
    appurl = 'http://myapp.appspot.com/'
    appname  = 'My App'
    username = 'foo'
    password = 'bar'

    # authenticate
    try: acsid = GoogleAuth(appurl, appname, username, password)
    except Exception, e:
        print str(e)
        sys.exit(1)

    # now access resources that require authentication...
    print 'authentication cookie:', acsid
    request = urllib2.Request(appurl, headers={'Cookie':acsid})
    chunk = urllib2.urlopen(request).read()

twisted crawler, alvo: mercadolivre

logo_mercadolivreJá pensou em fazer um programa que acessa o Mercado Livre, identifica o link de cada categoria, e extrai todos os produtos da primeira página de cada uma dessas categorias?

Pode até parecer complexo, mas não é. Esse programa existe, é simples, e está aqui, neste artigo, pronto pra você testar e modificar. :)

A algumas semanas tenho feito alguns crawlers pra esses sites que vendem produtos, como o Web Motors, Submarino, e até mesmo o eBay. A idéia é simplesmente extrair todos os produtos da primeira página de cada categoria, e montar uma base de dados com essas informações. O objetivo? Segredo de Estado.

Um dos problemas que encontrei ao fazer esse tipo de crawler, é controlar a concorrência de acesso. Se o programa acessa uma categoria por vez, demora uma eternidade pra baixar e processar cada página. Por outro lado, se ele acessa todas as categorias descontroladamente, o processo acaba criando muitos File Descriptors e isso causa diversos outros problemas pro sistema operacional – e não adianta falar em aumentar o limite usando ulimit -n, porque esses crawlers têm baixado milhões de links por dia.

A melhor maneira que encontrei pra solucionar esse problema, foi criando uma classe chamada “Controller”, que coloca as requisições em uma fila, baixa N páginas por vez, e ao invés de processá-las, simplesmente salva em um arquivo no disco.

Todos esses arquivos, com nomes únicos, são gerados usando um hash SHA1, e colocados em uma lista para serem processados offline, após o processo de download terminar.

O twisted tem um recurso muito interessante, que permite agrupar diversos deferreds em um único, e quando todos eles terminam, executa um callback. Nesse caso, quando todos os processos de baixar a página da subcategoria terminam, executa a função “offline”, que começa a processar cada uma, e extrair os produtos de lá.

Esta segunda etapa também poderia ser feita em paralelo, usando o twisted-parallels, mas decidi não colocar porque o código ficaria muito maior e você não teria tanta paciência pra entendê-lo. Usar um thread pool pra isso não valeria a pena, pois vale lembrar que o python usa o GIL, e I/O em thread só consome recurso e não aumenta em nada o desempenho.

A classe “MercadoLivre” tem 3 funções:

  • __str__: retorna o link com todas as categorias do Mercado Livre
  • parse_categories: um parser que retorna uma lista composta por tuples de (url, nome)
  • parse_subcategory: um parser pro conteúdo de cada categoria, que retorna uma lista com o título dos produtos anunciados lá

Todos esses parsers são baseados no BeautifulSoup 3.1. É necessário tê-lo instalado pra usar o programa.

Por fim, para usar este programa em outros sites, basta substituir a classe “MercadoLivre” pela sua própria, tipo “Submarino”.

Detalhe: essa porcaria de WordPress mostra a identação do código errada no artigo, mas se você clicar em “view plain”, poderá copiar e colar o código correto. Chame o suporte!

Aqui o código (muito belo, por sinal…):

#!/usr/bin/env python
# coding: utf-8

import os, re, sys
import shutil, hashlib
from Queue import Queue
from BeautifulSoup import BeautifulSoup

# twisted
from twisted.web import client
from twisted.internet import defer, threads, reactor

class MercadoLivre:
noscript = re.compile(r”(?is)]*>(.*?)“)

def __str__(self):
return ‘http://www.mercadolivre.com.br/jm/ml.allcategs.AllCategsServlet’

def parse_categories(self, content):
catlist = {}
cleanup = lambda s: s.replace(‘\n’, ”).strip()

# parse the document
soup = BeautifulSoup(content)

# find all categories and their items
current = ”
for item in soup.findAll(‘a’, {‘class’:[‘categ’,’seglnk’]}):
text = cleanup(item.contents[0])
attrs = dict(item.attrs)
if attrs.get(u’class’) == u’categ’:
current = text
catlist[current] = []
else:
catlist[current].append((attrs.get(u’href’, ”), text))

# return the list of categories and their items
return catlist

def parse_subcategory(self, content):
result = []
soup = BeautifulSoup(self.noscript.sub(”, content),
convertEntities=BeautifulSoup.HTML_ENTITIES)

for item in soup.findAll(‘div’, {‘class’:’col_titulo’}):
result.append(item.find(‘a’).contents[0].strip())

return result

class Controller:
def __init__(self, fetch):
self.count = 0
self.limit = fetch
self.queue = Queue()
self.tmpdir = ‘/tmp/mercadolivre.%d’ % os.getpid()
self.dispatch()

def encode(self, text):
return unicode(text).encode(‘utf-8’, ‘replace’)

def getPage(self, url, *args, **kwargs):
d = defer.Deferred()
self.queue.put((d, url, args, kwargs))
return d

def dispatch(self):
while True:
try:
assert self.count < self.limit d, url, args, kwargs = self.queue.get_nowait() except: break self.count += 1 deferred = client.getPage(url, *args, **kwargs) deferred.addBoth(self.decrease_count) deferred.chainDeferred(d) reactor.callLater(1, self.dispatch) def decrease_count(self, result): self.count -= 1 return result class main(Controller): def __init__(self, fetch, parser): # set the concurrent download limit for the crawler Controller.__init__(self, fetch) # fetch the main categories page self.total = 0 self.files = [] self.parser = parser d = self.getPage(str(parser), timeout=60) d.addCallback(self.fetch_categories) d.addErrback(self.error_categories) def error_categories(self, error): # hmmm... fatal error, cannot continue print 'cannot fetch categories from %s: %s' % (str(self.parser), str(error)) reactor.stop() def error_subcategory(self, error, href, category, subcategory): # problem fetching subcategory contents... print 'error "%s / %s": [%s] %s' % (category, subcategory, href, error.value) def fetch_categories(self, content): # parse the contents in a thread reactor.callInThread(self.parse_categories, content) def parse_categories(self, content): try: categories = self.parser.parse_categories(content) except Exception, e: print 'error parsing categories: %s' % str(e) reactor.stop() return print 'going to fetch %d categories...' % len(categories) tasks = [] for category, contents in categories.items(): category = self.encode(category) for href, subcategory in contents: href, subcategory = self.encode(href), self.encode(subcategory) d = self.getPage(href, timeout=60) d.addCallback(self.save_subcategory, href, category, subcategory) d.addErrback(self.error_subcategory, href, category, subcategory) tasks.append(d) # call the offline subcategory parser after downloading everything... d = defer.gatherResults(tasks) d.addCallback(self.offline) def save_subcategory(self, contents, href, category, subcategory): # create the tmpdir if it doesn't exist if not os.path.exists(self.tmpdir): os.mkdir(self.tmpdir) # create a unique hash for each category hash = hashlib.new('sha1', category+subcategory).hexdigest() filename = os.path.join(self.tmpdir, hash+'.dump') try: fd = open(filename, 'w') fd.write(contents) fd.close() except: return self.files.append((href, category, subcategory, filename)) print 'saving "%s / %s": %s' % (category, subcategory, hash) def offline(self, null): # start processing each subcategory... reactor.stop() for item in self.files: href, category, subcategory, filename = item sys.stdout.write('parsing "%s / %s": ' % (category, subcategory)) sys.stdout.flush() fd = open(filename) try: results = self.parser.parse_subcategory(fd.read()) except Exception, e: print 'error! %s' % str(e) continue lr = len(results) self.total += lr print '%d items' % lr for result in results: print ' ' + self.encode(result) print '\n%d items processed. cleaning up!' % self.total shutil.rmtree(self.tmpdir) if __name__ == '__main__': reactor.callWhenRunning(main, 10, MercadoLivre()) reactor.run() [/sourcecode] Agora, é só você se dedicar e fazer isso pros sites que te interessam. Notas?


paralelismo: python e twisted

parent-childExecutar algumas tarefas em paralelo no python, usando threads, pode ser problemático, especialmente quando há I/O envolvido.

Apesar do interpretador do python usar as threads nativas do sistema operacional (na maioria dos *nix, pthreads), ele não é 100% thread-safe. Isso não é um bug, mas sim um recurso usado para evitar travamento ou computação incorreta de valores.

Do texto original:

The Python interpreter is not fully thread safe. In order to support multi-threaded Python programs, there’s a global lock, called the global interpreter lock or GIL, that must be held by the current thread before it can safely access Python objects. Without the lock, even the simplest operations could cause problems in a multi-threaded program: for example, when two threads simultaneously increment the reference count of the same object, the reference count could end up being incremented only once instead of twice.

Therefore, the rule exists that only the thread that has acquired the global interpreter lock may operate on Python objects or call Python/C API functions. In order to support multi-threaded Python programs, the interpreter regularly releases and reacquires the lock — by default, every 100 bytecode instructions (this can be changed with sys.setcheckinterval()). The lock is also released and reacquired around potentially blocking I/O operations like reading or writing a file, so that other threads can run while the thread that requests the I/O is waiting for the I/O operation to complete.

Do outro lado, temos o Twisted. Para mim, o Twisted é uma super biblioteca de I/O, com todos os recursos necessários para tratar tarefas de modo assíncrono (non-blocking), considerando ainda aquelas que bloqueiam a execução por determinado tempo (blocking).

Internamente, o Twisted mantém um thread pool para executar as tarefas síncronas, como por exemplo executar um INSERT em banco de dados. Para tal, existem as funções threads.deferToThread e reactor.callInThread.

Porém, se considerarmos o GIL, mencionado acima, temos como resultado um belo problema: enquanto uma tarefa que necessita I/O síncrono está sendo executada em uma thread, o interpretador do python fica bloqueado naquela operação e todo o resto fica parado. Isso causa uma perda de desempenho sem tamanho, e diversos outros efeitos colaterais (como time-out em sockets, etc).

Solucionar o problema de paralelismo no python não é tão complicado, mas também não é tão simples. Existem dois módulos que conheço, que fazem isso: pyprocessing e multiprocessing.

Ambos usam uma API similar à do módulo threading, mas ao invés de threads, criam processos usando fork(), que por sua vez, executam outro interpretador do python e se livram dos efeitos do GIL em um único processo. Mas, quando usados em conjunto com o Twisted, necessitam diversas adaptações para a comunicação entre os processos pai e filhos, pois essa comunicação é feita através de pipes.

Com tudo isso junto, o GIL começa a se tornar um problema e a coisa toda já se parece com uma grande confusão, que para muitos já parece não ter solução decente. Em suma, é o seguinte: escrever programas que necessitam parelelismo em python exige usar pyprocessing ou multiprocessing, mas quando o programa é inteiro assíncrono usando Twisted, tudo fica complicado.

Isso obviamente aconteceu comigo em um sistema relativamente grande, que precisava executar a classificação de alguns dados em paralelo, e quando usava o thread pool do Twisted, o processo inteiro ficava lento devido à grande quantidade de I/O para ler e gravar arquivos no disco. Era o GIL me atrapalhando.

Para solucionar esse problema, escrevi um módulo que usa a função spawnProcess do próprio Twisted, e automaticamente trata do pipe entre os processos pai e filho. Ainda, escrevi um protocolo de comunicação entre eles que permite transmitir e receber dados entre os processos de modo transparente, de maneira assíncrona.

Ainda, esse módulo possui uma classe que cria um pool de processos (não threads) para onde é possível despachar dados para serem processados em paralelo, e aguardar pelo resultado em um deferred, seguindo todo o padrão do Twisted.

Depois de executar todos os testes necessários e deixar o código estável, implementei isso no meu sistema e resolvi todo o problema do paralelismo de maneira simples e elegante, sem nenhuma gambiarra nem esquisitisse.

Denominado Twisted-Parallels, o módulo foi liberado sob a GPL v2 e está disponível no Google Code, com alguns exemplos de utilização.


freeswitch, eventsocket, twisted!

freeswitchApós meses tentando resolver problemas ridículos com meu discador automático baseado em Asterisk, acabei mudando totalmente o rumo do sistema e re-escrevi todo o código para o FreeSWITCH.

A versão anterior se conectava no manager do Asterisk, originava as chamadas, e naquela completa bagunça sem padrão ia controlando o fluxo da chamada com auxílio de um AGI. Funções como transferência e gravação com detecção de caixa postal geravam diversas complicações devido à maneira como o Asterisk funciona, e o resultado era muita dor de cabeça.

Agora, a bagunça é assíncrona, tem padrão, e controla totalmente o FreeSWITCH através do Event Socket. Ao se conectar no Inbound Socket, o sistema origina as chamadas com destino ao Outbound Socket, onde cada uma é controlada individualmente e todos recursos funcionam perfeitamente, inclusive as várias maneiras de realizar transferência (bridge).

O FreeSWITCH está próximo da perfeição no quesito funcionamento, pecando apenas na documentação e alguns detalhes relacionados ao retorno (erro, etc) na originação de chamadas. Porém, esses detalhes são totalmente contornáveis com soluções simples, sem gambiarras e maracutaias como era com o Asterisk.

Em uma máquina comum, consigo manter 60 ligações simultâneas consumindo apenas 3% de cpu time. E o FreeSWITCH, que faz todo o controle das chamadas – conversão de mídia, gravação, detecção de caixa postal, de fax, etc etc etc, consome apenas 30% de cpu time.

Tudo isso é feito usando um protocolo do Twisted pro Event Socket do FreeSWITCH, que escrevi no último final de semana – entre carnaval, rock, etc. E pra felicidade geral da nação, tornei público pela lisença GPLv2, disponível no Google Code com exemplos.