コンテンツにスキップ

Quick Start

インストール

まず、pnqをインストールします。

$ pip install pnq

クエリを組み立てる

pnq.queryを介して、データソースを加工するパイプラインメソッドをチェインできます。 組み立てたクエリは、resultを実行することでリストとして実体化できます。

import pnq

pnq.query([1, 2]).map(lambda x: x * 2).filter(lambda x: x > 2).result()
# => [4]

resultで返されるリストは、リストを継承した独自拡張クラス(pnq.list)で、引き続きパイプラインメソッドをチェインできます。

import pnq

saved = pnq.query([1]).map(lambda x: x * 2).result()
saved.map(lambda x: x * 2).result()
# => [4]

pnq.listはリストと完全な互換性がありますが、可能な限り副作用を避ける場合は、to(list)または単にlistで組込みのリストにできます。

import pnq

pnq.query([1]).map(lambda x: x * 2).to(list)
# => [2]

list(pnq.query([1]).map(lambda x: x * 2))
# => [2]

データソースが辞書の場合は、キーバリューペアが列挙されることに注意してください。

import pnq

pnq.query({"a": 1, "b": 2, "c": 3}).filter(lambda x: x[1] > 1).result()
# => [("b", 2), ("c", 3)]

リストでなく辞書として実体化したい場合は、resultの代わりにto(dict)または単にdictを使用してください。

import pnq

pnq.query({"a": 1, "b": 2, "c": 3}).filter(lambda x: x[1] > 1).to(dict)
# => {"b": 2, "c": 3}

dict(pnq.query({"a": 1, "b": 2, "c": 3}).filter(lambda x: x[1] > 1))
# => {"b": 2, "c": 3}

なお、toはイテラブルを引数とする任意の関数を渡すことができます。

非同期イテレータを扱う

pnq.queryは非同期イテレータも取り扱うことができます。 ただし、非同期イテレータを実体化するにはresultの代わりにawaitを使用します。

import asyncio
import pnq

async def aiter():
    yield 1
    yield 2
    yield 3

async def main():
    return await pnq.query(aiter()).map(lambda x: x * 2)

asyncio.run(main())
# >> [2, 4, 6]

クエリはfor文でも使用できます。

import asyncio
import pnq

async def aiter():
    yield 4
    yield 5
    yield 6

async def main():
    for x in pnq.query([1, 2, 3]).map(lambda x: x * 2):
        print(x)
    # => 2, 4, 6

    async for x in pnq.query(aiter()).map(lambda x: x * 2):
        print(x)
    # => 8, 10, 12

asyncio.run(main())

クエリを実行する

pnq.queryは可能な限り評価を保留(遅延評価)します。 クエリは、評価を要求されたとき実際に実行されます。

すでにいくつか評価方法(for文、resultto)を紹介していますが、ほかにもいくつか評価メソッドを紹介します。

import pnq

# for x in ...: func(x)のショートカットとして使用できます
pnq.query([1, 2, 3]).map(lambda x: x * 2).each(print)
# => 2, 4, 6

# 要素の合計を求めます
pnq.query([1, 2, 3]).map(lambda x: x * 2).sum()
# => 12

非同期イテレータをデータソースとする場合は、_で明示的に非同期イテレータを評価すると伝え、awaitする必要があります。

import asyncio
import pnq

async def aiter():
    yield 1
    yield 2
    yield 3

async def main():
    await pnq.query(aiter()).map(lambda x: x * 2)._.each(print)
    # => 2, 4, 6

    await pnq.query(aiter()).map(lambda x: x * 2)._.sum()
    # => 12

asyncio.run(main())

バッチ処理に活用する

requestメソッドは、簡易的なバッチ処理に活用できます。 requestメソッドはシーケンスの要素を任意の関数に送出し、実行結果(pnq.Response)を返します。

処理中に例外が発生した場合、例外情報がerr msg stack_trace属性にエラー情報が格納されます。

import datetime
import logging
import pnq

log_name = "log_" + datetime.datetime.utcnow().isoformat() + ".jsonl.log"
log = logging.FileHandler(filename=log_name)

logger = logging.getLogger()
logger.addHandler(log)


params = pnq.query([{"val": 0}, {"val": 1}])

# パラメータを関数に渡します
# パラメータはキーワード引数としてアンパックされるため、パラメータは辞書互換オブジェクトである必要があります
@params.request
def do_something(val):
    if not (val > 0):
        raise ValueError(f"val must be 1 or greater. But got {val}")
    else:
        return "success"


# 処理が失敗した場合、実行情報をjsonl(1行1Json)形式で出力します
@do_something.each
def dump_if_error(x: pnq.Response):
    # エラーだった場合、ログに出力します
    if x.err:
        # レスポンスををjsonにシリアライズします
        # シリアライザはデフォルトで`json.dumps`(ensure_ascii=False)が使用されます
        logger.error(x.to_json())


# 全ての処理が成功した場合は`0`、いずれかが失敗した場合は`1`を返します
exit(pnq.from_jsonl(log_name).exists())

エラーログは、次のような出力になります。

cat `ls *.jsonl.log`
# {"func": "do_something", "kwargs": {"val": 0}, "err": "ValueError", "msg": "val must be 1 or greater: 0", "result": None, ...}

もっと知りたい

これであなたはクエリを自由に扱えるようになったはずです。

次章の参考例からお気に入りの機能を見つけましょう。