tornado

22 09 2009

Logo após o Facebook comprar o FriendFeed por meros U$50 milhões, não demorou pra começarem a publicar software gratuito.

O primeiro da fila foi o servidor e framework web usado pelo FriendFeed, chamado Tornado.

O FriendFeed é famoso por sua integração com Facebook, Twitter e Google. O que ninguém sabia, é que boa parte dessa integração é feita direto no servidor e framework web. Ainda, um dos recursos mais legais é o stream de dados do servidor assíncrono, também conhecido como comet, que permite long polling consumindo o mínimo de recursos.

É interessante como nos últimos anos essas tecnologias avançam no ambiente web. Na década de 90, a coisa mais moderna era desenvolver um CGI isolado, em Perl, que era executado pelo Apache, que por sua vez distribuía as requisições para um de seus processos usando um pool.

Depois, o procedimento de executar um programa externo foi se tornando um pouco ultrapassado, e o CGI foi parar dentro do Apache, com nomes como PHP e Coldfusion. Nessa época, enfiaram até Java dentro do Apache, colocaram o nome de Tomcat nele, e os CGIs passaram a se chamar Servlets.

Anos depois, fizeram o mesmo com o Python, e colocaram lá dentro do Apache o mod_python. No caso específico do Python, foram ainda mais longe: criaram um protocolo padrão para aplicativos web se comunicar com servidores web, chamado WSGI, e também foi parar dentro do Apache com nome de mod_wsgi.

Me lembro de cada uma dessas etapas, das dificuldades e dos benefícios de cada uma dessas adaptações. Programei em todas essas linguagens, nos últimos 15 anos. Meia vida.

Mais recentemente, antigas tecnologias como o FastCGI reabriram o caminho para aplicativos web 2.0. Servidores mais leves que o Apache, como o lighttpd e nginx começaram a ganhar muito espaço em sites com grande volume de acesso, por consumirem menos recursos trabalhando de forma assíncrona.

Mas antes do Apache, já existiam diversos serviços de rede que usavam o conceito de conexão assíncrona. Os servidores de irc, por exemplo, já formavam redes com centenas, e depois milhares, e depois centenas de milhares de usuários simultâneos, no bate-papo infinito.

Quanto mais o ambiente web cresce, mais pessoas acessam sites. Esses sites precisam de mais servidores, e esses servidores precisam de mais recursos. Usar conexões assíncronas permite aceitar mais pessoas simultâneas, consumindo menos recursos que tecnologias baseadas em thread, por exemplo. É uma espécie de redução de custo com otimização de recurso, ao mesmo tempo. Para programadores, é um conceito bem diferente do tradicional, especialmente para web, e exige um pouco de cérebro para se adaptar.

Os sistemas operacionais já vêm evoluindo suas internas para aceitar mais e mais conexões simultâneas, consumindo menos e menos recursos. Nomes como epoll, kqueue e /dev/poll estão cada vez mais no dia-a-dia dos programadores. E facilitadores como a libevent também.

O resultado disso é que os novos frameworks para aplicativos web já possuem servidores web embutidos, como é o caso do Tornado, Twisted Web, Merb, etc etc… Na frente deles, lighttpd e nginx fazem a distribuição de carga, e milhares de conexões web simultâneas, em um único computador físico, começa a se tornar realidade.





mais sobre crawlers e spiders

9 09 2009

logo_mercadolivreNo mês passado escrevi um artigo com um programa para capturar todos os items da primeira página de cada categoria do MercadoLivre.

Lá, lidava com alguns problemas como:

  • limite de concorrência no download das páginas
  • processamento de html em thread, síncrono
  • manter a maior parte do processo assíncrono, para ganhar tempo e CPU

Depois disso, precisei fazer umas alterações no código e acabei modificando um pouco programa, usando outras técnicas como:

Cada item desta lista corresponde aos itens da lista mais acima, respectivamente.

O esquema de cooperação do twisted é muito melhor que o Controller que havia criado anteriormente. Porém, muito mais complicado para jovens aprendizes. Recomendo este link para mais detalhes.

Sobre o processamento do html, vale a pena verificar o lxml. Antes, havia usado BeautifulSoup, que é muito bom, mas perde violentamente em desempenho e suporte a broken-html.

Por fim, o truque de usar generators para executar alguns callbacks inline é incrível, e absurdamente prático em casos como esse, do programa abaixo.

O resultado é final é o mesmo, mas a melhoria em desempenho é absurda. Fiz alguns testes na minha máquina e obtive o seguinte:

  • esta versão consome, em média, 20% menos de CPU
  • como não há necessidade de gravar os arquivos no disco, não consome disco
  • o processo todo ficou 657% mais rápido, simplesmente
  • ainda, o código é muito menor +_+

Veja ai:

#!/usr/bin/env python
# coding: utf-8

from lxml import html
from twisted.web import client
from twisted.python import log
from twisted.internet import task, defer, reactor

class MercadoLivre:
    def __str__(self):
        return 'http://www.mercadolivre.com.br/jm/ml.allcategs.AllCategsServlet'

    def parse_categories(self, content):
        category = subcategory = ''
        doc = html.fromstring(content)
        for link in doc.iterlinks():
            el, attr, href, offset = link
            try: category = el.find_class('categ')[0].text_content()
            except: pass
            else: continue
            if category:
                try: subcategory = el.find_class('seglnk')[0].text_content()
                except: continue
                else: yield (href, category, subcategory)

    def parse_subcategory(self, content):
        doc = html.fromstring(content)
        for element in doc.find_class('col_titulo'):
            yield element[0].text_content()

class Engine:
    def finish(self, result):
        reactor.stop()

    @defer.inlineCallbacks
    def fetch_categories(self, link, parser):
        try:
            doc = yield client.getPage(link)
            defer.returnValue(parser(doc))
        except Exception, e:
            print e

    def fetch_subcategory(self, links, parser, limit):
        coop = task.Cooperator()
        work = (client.getPage(link[0]).addCallback(parser).addCallback(self.page_items, *link) for link in links)
        result = defer.DeferredList([coop.coiterate(work) for x in xrange(limit)])
        result.addCallback(self.finish)
        result.addErrback(log.err)

    def page_items(self, items, href, category, subcategory):
        print 'Categoria: %s / %s' % (category.encode('utf-8'), subcategory.encode('utf-8'))
        for item in items: print ' -> %s' % item.encode('utf-8')
        print ''

def main(limit, *parsers):
    e = Engine()
    for parser in parsers:
        links = e.fetch_categories(str(parser), parser.parse_categories)
        links.addCallback(e.fetch_subcategory, parser.parse_subcategory, limit)

if __name__ == '__main__':
    reactor.callWhenRunning(main, 150, MercadoLivre())
    reactor.run()




novo serviço de geo localização

1 09 2009

freegeoipnetAos poucos, vou tirando meu serviço de Geo Localização do ar. Apesar de bastante acesso por dia no freegeoip.appspot.com, os problemas com o datastore do GAE me desanimam.

O serviço está no ar há cerca de 1 ano, e a média de acesso é de 10 e 15 requests por segundo. A recente redução das cotas tiram o serviço do ar muitas vezes, e isso porque diminuí ao máximo o nível de processamento, e a maioria das buscas é feita em memória direto no memcache.

freegeoip-quotas
Apesar do sistema ainda funcionar, não tenho mais paciência pra atualizar os dados lá. O datastore não permite deletar os dados antigos de uma maneira simples, e por isso gasta muito processamento quando uso o bulkloader, ou mesmo a remote_api.

Cansado de todos esses problemas, acabei pegando um VPS no slicehost.com e fiz uma versão nova do sistema. Tornei o código fonte disponível pela GPL v2, e daqui um tempo vou redirecionar o serviço do GAE pro slicehost.

Além de rodar mais rápido, é muito mais fácil pra atualizar o banco de dados de IPs, e tenho shell na máquina.

Se você costumava usar o serviço antigo, passe a usar o novo:

http://freegeoip.net

Espero que em breve possa atualizar meu banco de dados usando a Geolocation API que está sendo definida pelo W3C.

Sobre o slicehost: é o melhor serviço de hosting que já encontrei, por um preço ótimo. O formulário de signup é bem simples, e a configuração do ambiente é incrível. Ainda, permitem adicionar seus domínios direto no DNS deles, e apontar pras máquinas virtuais.





autenticação no google, pro appengine

20 08 2009

Google AppEngineAcabo de resolver um problema que vem me irritando muito nos últimos meses: sincronizar uma quande quantidade de dados com o appengine.

Depois de passar muita raiva com o bulkloader, que inevitavelmente acaba gerando esses Datastore Timeout, decidi fazer um outro esquema que funcionou muito melhor. Um dia coloco aqui, mas não agora.

O lance é que pra enviar os dados, habilitei o https e também autenticação. Assim, os blocos de dados podem ser sincronizados com segurança, apenas por um administrador da aplicação.

O problema foi fazer meu script se autenticar no Google Accounts. Apesar da API de autenticação do appengine, preferi não usá-la pois não há necessidade no meu caso, e optei por usar o formulário genérico do Google.

Encontrei este documento com alguns exemplos e dúvidas, mas não foi suficiente. Finalmente, entendi como o negócio funciona:

  • Faz um POST no serviço do Google Accounts enviando usuário, senha, nome de identificação e URL do app;
  • O Google Accounts irá fornecer um token, que então deve ser passado pro sistema de autenticação do próprio app (igual ao dev_appserver.py)
  • Nesse último request, o Google fornece um Cookie chamado ACSID, que deverá ser usado em todos as próximas requisições.

E pra resolver esse problema, escrevi uma classe chamada GoogleAuth, que recebe os dados necessários, se autentica no Google Accounts, e passa a fornecer o ACSID como string. Em caso de falha, tipo usuário ou senha errada, a classe gera um ValueError.

#!/usr/bin/env python
# coding: utf-8

import sys, urllib, urllib2

class GoogleAuth:
    cookie = ''

    # save the ACSID cookie before redirecting
    class RedirectHandler(urllib2.HTTPRedirectHandler):
        def __init__(self, klass): self.klass = klass
        def http_error_302(self, req, fp, code, msg, headers):
            try: self.klass.cookie = headers.get('Set-Cookie').split(';')[0]
            except: pass
            return urllib2.HTTPRedirectHandler.http_error_302(self, req, fp, code, msg, headers)

    # authenticate
    def __init__(self, appurl, appname, username, password):
        googauth = 'https://www.google.com/accounts/ClientLogin'

        # prepare the request data
        request_data = dict(
            Email=username,
            Passwd=password,
            source=appname,
            service='ah',
            accountType='HOSTED_OR_GOOGLE',
            )

        # get the token from google auth
        fd = urllib2.urlopen(googauth, urllib.urlencode(request_data))
        auth_dict = dict(x.split('=') for x in fd.read().split('\n') if x)
        fd.close()

        token = auth_dict.get('Auth')
        if auth_dict.get('Error') or not token:
            raise ValueError('authentication failed.')

        # get the ACSID cookie for further athenticated requests
        opener = urllib2.build_opener(self.RedirectHandler(self))
        fd = opener.open(appurl+'_ah/login?'+urllib.urlencode({'continue':appurl, 'auth':token}))
        fd.close()

    # return the cookie
    def __str__(self):
        return self.cookie

if __name__ == '__main__':
    appurl = 'http://myapp.appspot.com/'
    appname  = 'My App'
    username = 'foo'
    password = 'bar'

    # authenticate
    try: acsid = GoogleAuth(appurl, appname, username, password)
    except Exception, e:
        print str(e)
        sys.exit(1)

    # now access resources that require authentication...
    print 'authentication cookie:', acsid
    request = urllib2.Request(appurl, headers={'Cookie':acsid})
    chunk = urllib2.urlopen(request).read()




twisted crawler, alvo: mercadolivre

19 08 2009

logo_mercadolivreJá pensou em fazer um programa que acessa o Mercado Livre, identifica o link de cada categoria, e extrai todos os produtos da primeira página de cada uma dessas categorias?

Pode até parecer complexo, mas não é. Esse programa existe, é simples, e está aqui, neste artigo, pronto pra você testar e modificar. :)

A algumas semanas tenho feito alguns crawlers pra esses sites que vendem produtos, como o Web Motors, Submarino, e até mesmo o eBay. A idéia é simplesmente extrair todos os produtos da primeira página de cada categoria, e montar uma base de dados com essas informações. O objetivo? Segredo de Estado.

Um dos problemas que encontrei ao fazer esse tipo de crawler, é controlar a concorrência de acesso. Se o programa acessa uma categoria por vez, demora uma eternidade pra baixar e processar cada página. Por outro lado, se ele acessa todas as categorias descontroladamente, o processo acaba criando muitos File Descriptors e isso causa diversos outros problemas pro sistema operacional – e não adianta falar em aumentar o limite usando ulimit -n, porque esses crawlers têm baixado milhões de links por dia.

A melhor maneira que encontrei pra solucionar esse problema, foi criando uma classe chamada “Controller”, que coloca as requisições em uma fila, baixa N páginas por vez, e ao invés de processá-las, simplesmente salva em um arquivo no disco.

Todos esses arquivos, com nomes únicos, são gerados usando um hash SHA1, e colocados em uma lista para serem processados offline, após o processo de download terminar.

O twisted tem um recurso muito interessante, que permite agrupar diversos deferreds em um único, e quando todos eles terminam, executa um callback. Nesse caso, quando todos os processos de baixar a página da subcategoria terminam, executa a função “offline”, que começa a processar cada uma, e extrair os produtos de lá.

Esta segunda etapa também poderia ser feita em paralelo, usando o twisted-parallels, mas decidi não colocar porque o código ficaria muito maior e você não teria tanta paciência pra entendê-lo. Usar um thread pool pra isso não valeria a pena, pois vale lembrar que o python usa o GIL, e I/O em thread só consome recurso e não aumenta em nada o desempenho.

A classe “MercadoLivre” tem 3 funções:

  • __str__: retorna o link com todas as categorias do Mercado Livre
  • parse_categories: um parser que retorna uma lista composta por tuples de (url, nome)
  • parse_subcategory: um parser pro conteúdo de cada categoria, que retorna uma lista com o título dos produtos anunciados lá

Todos esses parsers são baseados no BeautifulSoup 3.1. É necessário tê-lo instalado pra usar o programa.

Por fim, para usar este programa em outros sites, basta substituir a classe “MercadoLivre” pela sua própria, tipo “Submarino”.

Detalhe: essa porcaria de WordPress mostra a identação do código errada no artigo, mas se você clicar em “view plain”, poderá copiar e colar o código correto. Chame o suporte!

Aqui o código (muito belo, por sinal…):

#!/usr/bin/env python
# coding: utf-8

import os, re, sys
import shutil, hashlib
from Queue import Queue
from BeautifulSoup import BeautifulSoup

# twisted
from twisted.web import client
from twisted.internet import defer, threads, reactor

class MercadoLivre:
    noscript = re.compile(r"(?is)<script[^>]*>(.*?)</script>")

    def __str__(self):
	return 'http://www.mercadolivre.com.br/jm/ml.allcategs.AllCategsServlet'

    def parse_categories(self, content):
	catlist = {}
	cleanup = lambda s: s.replace('\n', '').strip()

	# parse the document
	soup = BeautifulSoup(content)

	# find all categories and their items
	current = ''
	for item in soup.findAll('a', {'class':['categ','seglnk']}):
	    text = cleanup(item.contents[0])
	    attrs = dict(item.attrs)
	    if attrs.get(u'class') == u'categ':
		current = text
		catlist[current] = []
	    else:
		catlist[current].append((attrs.get(u'href', ''), text))

	# return the list of categories and their items
	return catlist

    def parse_subcategory(self, content):
	result = []
	soup = BeautifulSoup(self.noscript.sub('', content),
	    convertEntities=BeautifulSoup.HTML_ENTITIES)

	for item in soup.findAll('div', {'class':'col_titulo'}):
	    result.append(item.find('a').contents[0].strip())

	return result

class Controller:
    def __init__(self, fetch):
	self.count = 0
	self.limit = fetch
	self.queue = Queue()
	self.tmpdir = '/tmp/mercadolivre.%d' % os.getpid()
	self.dispatch()

    def encode(self, text):
	return unicode(text).encode('utf-8', 'replace')

    def getPage(self, url, *args, **kwargs):
	d = defer.Deferred()
	self.queue.put((d, url, args, kwargs))
	return d

    def dispatch(self):
	while True:
	    try:
		assert self.count < self.limit
		d, url, args, kwargs = self.queue.get_nowait()
	    except: break
	    self.count += 1
	    deferred = client.getPage(url, *args, **kwargs)
	    deferred.addBoth(self.decrease_count)
	    deferred.chainDeferred(d)
	reactor.callLater(1, self.dispatch)

    def decrease_count(self, result):
	self.count -= 1
	return result

class main(Controller):
    def __init__(self, fetch, parser):
	# set the concurrent download limit for the crawler
	Controller.__init__(self, fetch)

	# fetch the main categories page
	self.total = 0
	self.files = []
	self.parser = parser
	d = self.getPage(str(parser), timeout=60)
	d.addCallback(self.fetch_categories)
	d.addErrback(self.error_categories)

    def error_categories(self, error):
	# hmmm... fatal error, cannot continue
	print 'cannot fetch categories from %s: %s' % (str(self.parser), str(error))
	reactor.stop()

    def error_subcategory(self, error, href, category, subcategory):
	# problem fetching subcategory contents...
	print 'error "%s / %s": [%s] %s' % (category, subcategory, href, error.value)

    def fetch_categories(self, content):
	# parse the contents in a thread
	reactor.callInThread(self.parse_categories, content)

    def parse_categories(self, content):
	try: categories = self.parser.parse_categories(content)
	except Exception, e:
	    print 'error parsing categories: %s' % str(e)
	    reactor.stop()
	    return

	print 'going to fetch %d categories...' % len(categories)
	tasks = []
	for category, contents in categories.items():
	    category = self.encode(category)
	    for href, subcategory in contents:
		href, subcategory = self.encode(href), self.encode(subcategory)
		d = self.getPage(href, timeout=60)
		d.addCallback(self.save_subcategory, href, category, subcategory)
		d.addErrback(self.error_subcategory, href, category, subcategory)
		tasks.append(d)

	# call the offline subcategory parser after downloading everything...
	d = defer.gatherResults(tasks)
	d.addCallback(self.offline)

    def save_subcategory(self, contents, href, category, subcategory):
	# create the tmpdir if it doesn't exist
	if not os.path.exists(self.tmpdir): os.mkdir(self.tmpdir)

	# create a unique hash for each category
	hash = hashlib.new('sha1', category+subcategory).hexdigest()
	filename = os.path.join(self.tmpdir, hash+'.dump')
	try:
	    fd = open(filename, 'w')
	    fd.write(contents)
	    fd.close()
	except: return
	self.files.append((href, category, subcategory, filename))
	print 'saving "%s / %s": %s' % (category, subcategory, hash)

    def offline(self, null):
	# start processing each subcategory...
	reactor.stop()
	for item in self.files:
	    href, category, subcategory, filename = item
	    sys.stdout.write('parsing "%s / %s": ' % (category, subcategory))
	    sys.stdout.flush()
	    fd = open(filename)
	    try: results = self.parser.parse_subcategory(fd.read())
	    except Exception, e:
		print 'error! %s' % str(e)
		continue

	    lr = len(results)
	    self.total += lr
	    print '%d items' % lr
	    for result in results:
		print '  ' + self.encode(result)

	print '\n%d items processed. cleaning up!' % self.total
	shutil.rmtree(self.tmpdir)

if __name__ == '__main__':
    reactor.callWhenRunning(main, 10, MercadoLivre())
    reactor.run()

Agora, é só você se dedicar e fazer isso pros sites que te interessam.

Notas?





skype no iphone 3g

17 08 2009

skype-logoDepois de tanto tempo sem escrever nada, volto logo com uma boa: fazer chamadas de voz por SIP ou Skype, no iPhone, usando a rede 3G !

Aproveitando o assunto (iPhone), vale lembrar que tem apenas duas coisas que me irritam profundamente (até agora) com este telefone:

  1. Não permitir SIP/VoIP/Skype pela rede 3G (olho gordo das operadoras)
  2. Não ter suporte a Flash (idiotice da Apple por causa dos jogos)

De resto, pra quem curte Interweb, é o melhor aparelho que existe.

Voltando ao assunto… a primeira coisa necessária pra ignorar as restrições de SIP/VoIP/Skype apenas pela conexão WiFi é fazer o jailbreak do telefone. O melhor programa que encontrei pra fazer isso, à partir do Mac OS X, é o PwnageTool – não sei o que tem pra Linux nem Windows, não me xinguem.

O jailbreak irá instalar o Cydia automaticamente, que é um front-end pro APT, exatamente igual ao do Debian.

Depois disso é necessário conectar o iPhone a uma rede WiFi, saber o endereço IP dele, e pelo Cydia, instalar os pacotes OpenSSH, APT7 (Strict) e Bigboss Recommended Tools (este irá prover tudo que interessa: ps, vim, etc…).

Com isso feito, você precisa conectar via SSH no iPhone, usando o seu programa preferido: ssh no console, ou PuTTY pros usuários de Windows. Pra conectar no iPhone, o usuário é root e a senha padrão é alpine.

No console, é necessário instalar dois pacotes:

  • mobilesubstrate : resumindo, insere código em um app durante sua execução
  • voipoverip3g: biblioteca do mobilesubstrate pra enganar o iPhone, assim os programar não sabem que estão usando 3G, e pensam que é WiFi (duh!)

Para instalar, o procedimento é padrão de debian/ubuntu, mas no console do iPhone:

apt-get install mobilesubstrate voipoverip3g

Com isso feito, reinicie o iPhone, re-instale o Skype, e seja feliz. Quando abrir o Skype pela primeira vez, ele vai reclamar que “não foi feito pra rodar em iPhoneOS modificado”, mas é “apenas uma mensagem”.

Pros mais interessados: o pacote voipoverip3g instala esse “plugin” pro mobilesubtrate, e sua configuração fica em

/Library/MobileSubstrate/DynamicLibraries/VoIPover3G.plist

Você pode adicionar suporte a qualquer programa que seja restrito ao uso de SIP/VoIP pela rede WiFi, basta editar com vim e adicionar a chave do programa, seguindo o formato do arquivo. O padrão é adicionar suporte ao Fring e Skype.

Este é o conteúdo original do arquivo:

Bozo:~ root# cat /Library/MobileSubstrate/DynamicLibraries/VoIPover3G.plist
Filter = {Bundles = ("com.Fringland.Fring", "com.skype.skype");};

Boa?





paralelismo: python e twisted

4 05 2009

parent-childExecutar algumas tarefas em paralelo no python, usando threads, pode ser problemático, especialmente quando há I/O envolvido.

Apesar do interpretador do python usar as threads nativas do sistema operacional (na maioria dos *nix, pthreads), ele não é 100% thread-safe. Isso não é um bug, mas sim um recurso usado para evitar travamento ou computação incorreta de valores.

Do texto original:

The Python interpreter is not fully thread safe. In order to support multi-threaded Python programs, there’s a global lock, called the global interpreter lock or GIL, that must be held by the current thread before it can safely access Python objects. Without the lock, even the simplest operations could cause problems in a multi-threaded program: for example, when two threads simultaneously increment the reference count of the same object, the reference count could end up being incremented only once instead of twice.

Therefore, the rule exists that only the thread that has acquired the global interpreter lock may operate on Python objects or call Python/C API functions. In order to support multi-threaded Python programs, the interpreter regularly releases and reacquires the lock — by default, every 100 bytecode instructions (this can be changed with sys.setcheckinterval()). The lock is also released and reacquired around potentially blocking I/O operations like reading or writing a file, so that other threads can run while the thread that requests the I/O is waiting for the I/O operation to complete.

Do outro lado, temos o Twisted. Para mim, o Twisted é uma super biblioteca de I/O, com todos os recursos necessários para tratar tarefas de modo assíncrono (non-blocking), considerando ainda aquelas que bloqueiam a execução por determinado tempo (blocking).

Internamente, o Twisted mantém um thread pool para executar as tarefas síncronas, como por exemplo executar um INSERT em banco de dados. Para tal, existem as funções threads.deferToThread e reactor.callInThread.

Porém, se considerarmos o GIL, mencionado acima, temos como resultado um belo problema: enquanto uma tarefa que necessita I/O síncrono está sendo executada em uma thread, o interpretador do python fica bloqueado naquela operação e todo o resto fica parado. Isso causa uma perda de desempenho sem tamanho, e diversos outros efeitos colaterais (como time-out em sockets, etc).

Solucionar o problema de paralelismo no python não é tão complicado, mas também não é tão simples. Existem dois módulos que conheço, que fazem isso: pyprocessing e multiprocessing.

Ambos usam uma API similar à do módulo threading, mas ao invés de threads, criam processos usando fork(), que por sua vez, executam outro interpretador do python e se livram dos efeitos do GIL em um único processo. Mas, quando usados em conjunto com o Twisted, necessitam diversas adaptações para a comunicação entre os processos pai e filhos, pois essa comunicação é feita através de pipes.

Com tudo isso junto, o GIL começa a se tornar um problema e a coisa toda já se parece com uma grande confusão, que para muitos já parece não ter solução decente. Em suma, é o seguinte: escrever programas que necessitam parelelismo em python exige usar pyprocessing ou multiprocessing, mas quando o programa é inteiro assíncrono usando Twisted, tudo fica complicado.

Isso obviamente aconteceu comigo em um sistema relativamente grande, que precisava executar a classificação de alguns dados em paralelo, e quando usava o thread pool do Twisted, o processo inteiro ficava lento devido à grande quantidade de I/O para ler e gravar arquivos no disco. Era o GIL me atrapalhando.

Para solucionar esse problema, escrevi um módulo que usa a função spawnProcess do próprio Twisted, e automaticamente trata do pipe entre os processos pai e filho. Ainda, escrevi um protocolo de comunicação entre eles que permite transmitir e receber dados entre os processos de modo transparente, de maneira assíncrona.

Ainda, esse módulo possui uma classe que cria um pool de processos (não threads) para onde é possível despachar dados para serem processados em paralelo, e aguardar pelo resultado em um deferred, seguindo todo o padrão do Twisted.

Depois de executar todos os testes necessários e deixar o código estável, implementei isso no meu sistema e resolvi todo o problema do paralelismo de maneira simples e elegante, sem nenhuma gambiarra nem esquisitisse.

Denominado Twisted-Parallels, o módulo foi liberado sob a GPL v2 e está disponível no Google Code, com alguns exemplos de utilização.





consulta geográfica de endereço ip

26 03 2009

Google AppEngineQuem nunca precisou nem procurou por isso, é porque não sabe que é possível. Um tempo atrás, precisei desse tipo de consulta mas não encontrei nada gratuito, que fosse “ilimitado”. Normalmente, os sites que possuem esse serviço, gratuito, colocam cotas diárias e oferecem um plano pago.

Esses dias o Gleicon me passou um link, de um cara que teve a mesma necessidade, o mesmo problema, mas arrumou a solução: sabe-se lá como, ele “criou” uma base com todas as informações. No blog dele, tornou disponível gratuitamente, em dois formatos.

Um deles, é um dump do MySQL que qualquer um pode importar; o outro, é um arquivo ZIP contendo diversos arquivos em formato similar ao CSV (ele usa ponto-e-vírgula como delimitador, e coloca o conteúdo de cada campo entre aspas duplas).

Além de publicar essa base, cara (Marc) também provê um serviço gratuito para executar essas consultas; porém, quando fui usá-lo, estava fora do ar.

Pensei que isso pudesse ser interessante, se rodasse no App Engine do Google, que não custa nada, e é “ilimitado” pra qualquer um.  Fiz o seguinte:

  1. Criei o app lá, com nome free geoip
  2. Juntei os dados de IP por cidade, lista de países e regiões
  3. Importei tudo pro datastore, usando o bulk loader que vem no SDK
  4. O iWeb criou um frontend “bonito”

O resultado foi esse aqui:

http://freegeoip.appspot.com

Se alguém quiser fazer um favicon.ico, me avise!





freeswitch, eventsocket, twisted!

27 02 2009

freeswitchApós meses tentando resolver problemas ridículos com meu discador automático baseado em Asterisk, acabei mudando totalmente o rumo do sistema e re-escrevi todo o código para o FreeSWITCH.

A versão anterior se conectava no manager do Asterisk, originava as chamadas, e naquela completa bagunça sem padrão ia controlando o fluxo da chamada com auxílio de um AGI. Funções como transferência e gravação com detecção de caixa postal geravam diversas complicações devido à maneira como o Asterisk funciona, e o resultado era muita dor de cabeça.

Agora, a bagunça é assíncrona, tem padrão, e controla totalmente o FreeSWITCH através do Event Socket. Ao se conectar no Inbound Socket, o sistema origina as chamadas com destino ao Outbound Socket, onde cada uma é controlada individualmente e todos recursos funcionam perfeitamente, inclusive as várias maneiras de realizar transferência (bridge).

O FreeSWITCH está próximo da perfeição no quesito funcionamento, pecando apenas na documentação e alguns detalhes relacionados ao retorno (erro, etc) na originação de chamadas. Porém, esses detalhes são totalmente contornáveis com soluções simples, sem gambiarras e maracutaias como era com o Asterisk.

Em uma máquina comum, consigo manter 60 ligações simultâneas consumindo apenas 3% de cpu time. E o FreeSWITCH, que faz todo o controle das chamadas – conversão de mídia, gravação, detecção de caixa postal, de fax, etc etc etc, consome apenas 30% de cpu time.

Tudo isso é feito usando um protocolo do Twisted pro Event Socket do FreeSWITCH, que escrevi no último final de semana – entre carnaval, rock, etc. E pra felicidade geral da nação, tornei público pela lisença GPLv2, disponível no Google Code com exemplos.





paulistão 2009

27 01 2009

palmeirasAproveitando que meu time está em primeiro, aproveitei pra colocar mais um crawler com BautifulSoup, só que desta vez usando twisted.

A fonte de informação é o Globo Esporte, e pela leve bagunça que há no HTML e CSS, o código ficou meio feioso. Além disso, consegui deixar ainda mais feio pra poder imprimir bonito no terminal.

#!/usr/bin/env python
# coding: utf-8
# 20090127 AF - paulistão 2009, crawler

import re
from sys import stdout
from twisted.internet import reactor
from twisted.web.client import getPage
from BeautifulSoup import BeautifulSoup

crlft = re.compile(r'[\r\n\t]*')
URL = 'http://globoesporte.globo.com/Esportes/Futebol/Classificacao/0,,ESP0-9839,00.html'

def failure(err):
    print(err)
    reactor.stop()

def parser(contents):
    current = 1
    columns = 10

    f = lambda m: crlft.sub('', m.contents[0]+' ')
    soup = BeautifulSoup(contents, convertEntities='html')
    classes = ['borda borda-forte', 'borda semborda', ' borda-forte',
                ' semborda', 'time borda', 'borda', 'time ', '']

    stdout.write('% 18s' % 'TIME')
    for label in ['P', 'J', 'V', 'E', 'D', 'GP', 'GC', 'SG', '(%)']:
        stdout.write('% 5s' % label)
    stdout.write('\n')

    for item in soup.findAll('td', {'class':classes}):
        if item.find('span'):
            temp = '%02d. % 15s' % ((current / columns)+1,
                item.span.find('a') and f(item.span.a) or f(item.span))
            stdout.write(temp.encode('utf-8'))
        else:
            stdout.write('% 5s' % f(item))
            if not current % columns: stdout.write('\n')
        current += 1
    reactor.stop()

if __name__ == '__main__':
    deferred = getPage(URL)
    deferred.addCallback(parser)
    deferred.addErrback(failure)
    reactor.run()

E o resultado, no terminal:

$ python paulista.py
              TIME    P    J    V    E    D   GP   GC   SG  (%)
01.      Palmeiras    9    3    3    0    0    7    0    7  100
02.         Santos    6    2    2    0    0    4    1    3  100
03.    São Caetano    6    2    2    0    0    3    0    3  100
04.        Guarani    6    2    2    0    0    2    0    2  100
05.       Mirassol    4    2    1    1    0    5    3    2   66
06.      São Paulo    4    2    1    1    0    3    1    2   66
07.    Corinthians    4    2    1    1    0    3    2    1   66
08.    Ponte Preta    4    3    1    1    1    2    1    1   44
09.     Bragantino    3    2    1    0    1    4    3    1   50
10.       Paulista    3    2    1    0    1    2    2    0   50
11.    Santo André    3    3    1    0    2    1    2   -1   33
12.        Barueri    2    2    0    2    0    4    4    0   33
13.          Oeste    2    2    0    2    0    2    2    0   33
14.         Ituano    1    2    0    1    1    1    2   -1   16
15.    Botafogo-SP    1    2    0    1    1    4    6   -2   16
16.  Guaratinguetá    1    2    0    1    1    2    4   -2   16
17.        Marília    1    3    0    1    2    3    8   -5   11
18.       Noroeste    0    2    0    0    2    1    4   -3    0
19.     Portuguesa    0    2    0    0    2    0    3   -3    0
20.     Mogi Mirim    0    2    0    0    2    0    5   -5    0