PNQ
Description | PNQ is a Python implementation like Language Integrated Query (LINQ). |
Repository | https://github.com/sasano8/pnq |
目次
PNQ
PNQ is a Python implementation like Language Integrated Query (LINQ).
Danger
PNQはベータ版です。
- 現在、ドキュメントとAPIが一致していません。
- ライブラリが十分な品質に到達するまで、頻繁に内部実装やAPIが更新される恐れがあります。
- 本番環境では利用しないでください。
Features
- コレクション操作に関する多彩な操作
- アクセシブルなインタフェース
- 型ヒントの活用
- 非同期ストリームに対応
Similar tools
Documentation
- See documentation for more details.
Dependencies
- Python 3.7+
Installation
Install with pip:
$ pip install pnq
Getting Started
import pnq
for x in pnq.query([1, 2, 3]).map(lambda x: x * 2):
print(x)
# => 2, 4, 6
pnq.query([1, 2, 3]).map(lambda x: x * 2).result()
# => [2, 4, 6]
pnq.query([1, 2, 3]).filter(lambda x: x == 3).one()
# => 2
import asyncio
import pnq
async def aiter():
yield 1
yield 2
yield 3
async def main():
async for x in pnq.query(aiter()).map(lambda x: x * 2):
print(x)
# => 2, 4, 6
await pnq.query(aiter()).map(lambda x: x * 2)
# => [2, 4, 6]
await pnq.query(aiter()).filter(lambda x: x == 3)._.one()
# => 3
asyncio.run(main())
Quick Start
インストール
まず、pnq
をインストールします。
$ pip install pnq
クエリを組み立てる
pnq.query
を介して、データソースを加工するパイプラインメソッドをチェインできます。
組み立てたクエリは、result
を実行することでリストとして実体化できます。
import pnq
pnq.query([1, 2]).map(lambda x: x * 2).filter(lambda x: x > 2).result()
# => [4]
result
で返されるリストは、リストを継承した独自拡張クラス(pnq.list
)で、引き続きパイプラインメソッドをチェインできます。
import pnq
saved = pnq.query([1]).map(lambda x: x * 2).result()
saved.map(lambda x: x * 2).result()
# => [4]
pnq.list
はリストと完全な互換性がありますが、可能な限り副作用を避ける場合は、to(list)
または単にlist
で組込みのリストにできます。
import pnq
pnq.query([1]).map(lambda x: x * 2).to(list)
# => [2]
list(pnq.query([1]).map(lambda x: x * 2))
# => [2]
データソースが辞書の場合は、キーバリューペアが列挙されることに注意してください。
import pnq
pnq.query({"a": 1, "b": 2, "c": 3}).filter(lambda x: x[1] > 1).result()
# => [("b", 2), ("c", 3)]
リストでなく辞書として実体化したい場合は、result
の代わりにto(dict)
または単にdict
を使用してください。
import pnq
pnq.query({"a": 1, "b": 2, "c": 3}).filter(lambda x: x[1] > 1).to(dict)
# => {"b": 2, "c": 3}
dict(pnq.query({"a": 1, "b": 2, "c": 3}).filter(lambda x: x[1] > 1))
# => {"b": 2, "c": 3}
なお、to
はイテラブルを引数とする任意の関数を渡すことができます。
非同期イテレータを扱う
pnq.query
は非同期イテレータも取り扱うことができます。
ただし、非同期イテレータを実体化するにはresult
の代わりにawait
を使用します。
import asyncio
import pnq
async def aiter():
yield 1
yield 2
yield 3
async def main():
return await pnq.query(aiter()).map(lambda x: x * 2)
asyncio.run(main())
# >> [2, 4, 6]
クエリはfor
文でも使用できます。
import asyncio
import pnq
async def aiter():
yield 4
yield 5
yield 6
async def main():
for x in pnq.query([1, 2, 3]).map(lambda x: x * 2):
print(x)
# => 2, 4, 6
async for x in pnq.query(aiter()).map(lambda x: x * 2):
print(x)
# => 8, 10, 12
asyncio.run(main())
クエリを実行する
pnq.query
は可能な限り評価を保留(遅延評価)します。
クエリは、評価を要求されたとき実際に実行されます。
すでにいくつか評価方法(for
文、result
、to
)を紹介していますが、ほかにもいくつか評価メソッドを紹介します。
import pnq
# for x in ...: func(x)のショートカットとして使用できます
pnq.query([1, 2, 3]).map(lambda x: x * 2).each(print)
# => 2, 4, 6
# 要素の合計を求めます
pnq.query([1, 2, 3]).map(lambda x: x * 2).sum()
# => 12
非同期イテレータをデータソースとする場合は、_
で明示的に非同期イテレータを評価すると伝え、await
する必要があります。
import asyncio
import pnq
async def aiter():
yield 1
yield 2
yield 3
async def main():
await pnq.query(aiter()).map(lambda x: x * 2)._.each(print)
# => 2, 4, 6
await pnq.query(aiter()).map(lambda x: x * 2)._.sum()
# => 12
asyncio.run(main())
バッチ処理に活用する
request
メソッドは、簡易的なバッチ処理に活用できます。
request
メソッドはシーケンスの要素を任意の関数に送出し、実行結果(pnq.Response
)を返します。
処理中に例外が発生した場合、例外情報がerr
msg
stack_trace
属性にエラー情報が格納されます。
import datetime
import logging
import pnq
log_name = "log_" + datetime.datetime.utcnow().isoformat() + ".jsonl.log"
log = logging.FileHandler(filename=log_name)
logger = logging.getLogger()
logger.addHandler(log)
params = pnq.query([{"val": 0}, {"val": 1}])
# パラメータを関数に渡します
# パラメータはキーワード引数としてアンパックされるため、パラメータは辞書互換オブジェクトである必要があります
@params.request
def do_something(val):
if not (val > 0):
raise ValueError(f"val must be 1 or greater. But got {val}")
else:
return "success"
# 処理が失敗した場合、実行情報をjsonl(1行1Json)形式で出力します
@do_something.each
def dump_if_error(x: pnq.Response):
# エラーだった場合、ログに出力します
if x.err:
# レスポンスををjsonにシリアライズします
# シリアライザはデフォルトで`json.dumps`(ensure_ascii=False)が使用されます
logger.error(x.to_json())
# 全ての処理が成功した場合は`0`、いずれかが失敗した場合は`1`を返します
exit(pnq.from_jsonl(log_name).exists())
エラーログは、次のような出力になります。
cat `ls *.jsonl.log`
# {"func": "do_something", "kwargs": {"val": 0}, "err": "ValueError", "msg": "val must be 1 or greater: 0", "result": None, ...}
もっと知りたい
これであなたはクエリを自由に扱えるようになったはずです。
次章の参考例からお気に入りの機能を見つけましょう。
インターフェース
Transforming
pnq.actions.map
(self, selector, unpack='')シーケンスの各要素を新しいフォームに射影します。
str関数を渡した場合、None
は""
を返します(Pythonの標準動作は"None"
を返します)。
Args:
- self: 変換対象のシーケンス
- selector: 各要素に対する変換関数
- unpack: 引数をどのように展開するか指定する
""
: 展開せずにそのまま値を渡す(デフォルト)"*"
: 位置引数として展開する"**"
: キーワード引数として展開する"***"
: 位置引数とキーワード引数を展開する(pnq.Argumentsインスタンスに対して使用できます)
Usage:
>>> pnq.query([1]).map(lambda x: x * 2).to(list)
[2]
>>> pnq.query([None]).map(str).to(list)
[""]
>>> pnq.query([(1, 2)]).map(lambda arg1, arg2: arg1, "*").to(list)
[1]
>>> pnq.query([{"arg1": 1, "arg2": 2}]).map(lambda arg1, arg2: arg1, "**").to(list)
[1]
>>> pnq.query([pnq.Arguments(1, 2, name="test", age=20)]).map(lambda arg1, arg2, name, age: name, "***").to(list)
["test"]
pnq.actions.select
(self, field, *fields, attr=False)シーケンスの各要素からアイテムを選択し新しいフォームに射影します。 複数のアイテムを選択した場合は、タプルとして射影します。
Args:
- self: 変換対象のシーケンス
- field: 各要素から選択するアイテム
- fields: 各要素から選択する追加のアイテム
- attr: 要素の属性から取得する場合はTrue
Usage:
>>> pnq.query([(1, 2)]).select(0).to(list)
[1]
>>> pnq.query([{"id": 1, "name": "a"}]).select("id", "name").to(list)
[(1, "a")]
>>> pnq.query([user]).select("id", "name", attr=True).to(list)
[(1, "a")]
pnq.actions.select_as_tuple
(self, *fields, attr=False)シーケンスの各要素からアイテムまたは属性を選択し辞書として新しいフォームに射影します。 selectと似ていますが、選択した値が1つでも必ずタプルを返します。
Args:
- self: 変換対象のシーケンス
- fields: 選択するアイテムまたは属性
- attr: 属性から取得する場合はTrueとする
Usage:
>>> pnq.query([(1, 2)]).select_as_tuple(0).to(list)
[(1,)]
>>> pnq.query([user]).select_as_tuple("id", "name", attr=True).to(list)
[("1", "a")]
pnq.actions.select_as_dict
(self, *fields, attr=False)シーケンスの各要素からアイテムまたは属性を選択し辞書として新しいフォームに射影します。
Args:
- self: 変換対象のシーケンス
- fields: 選択するアイテムまたは属性
- attr: 属性から取得する場合はTrueとする
Usage:
>>> pnq.query([(1, 2)]).select_as_dict(0).to(list)
[{0: 1}]
>>> pnq.query([user]).select_as_dict("id", "name", attr=True).to(list)
[{"id": 1, "name": "b"}]
pnq.actions.reflect
(self, mapping, *, default=typing.NoReturn, attr=False)シーケンスの各要素のフィールドを与えたマッピングに基づき辞書として新しいフォームに射影します。
Args:
- self: 変換対象のシーケンス
- mapping: 元の要素のフィールドと射影先のフィールドの対応表
- default: フィールドを取得できない場合のデフォルト値
- attr: 属性から取得する場合はTrueとする
Usage:
>>> person = {"id":1, "name": "山田", "kana": "やまだ", "note": "hoge"}
>>> pnq.query([person]).reflect({
>>> "id": "id",
>>> "name": {"name", "searchable"},
>>> "kana": {"kana", "searchable"},
>>> "note": "searchable"
>>> }).to(list)
>>> [{"id": 1, "name": "山田", "kana": "やまだ", "searchable": ["山田", "やまだ", "hoge"]}]
pnq.actions.gather
(self)シーケンスの要素から結果を取り出し、新しいフォームに射影します。
要素は、concurrent.futures.Future
、または、awaitable
である必要があります。
pnq.query
は、それらのインターフェースを実装しているため、本メソッドで実体化可能です。
Usage:
from concurretn.futures import Future
import asyncio
def main1():
future = Future()
future.set_result(0)
return pnq.query([future]).gather().result()
async def main2():
async def heavy_task():
return 1
async def aiter():
yield 3
yield 4
future = Future()
future.set_result(0)
tasks = pnq.query([
future,
heavy_task(),
asyncio.create_task(heavy_task()),
pnq.([1, 2]).map(lambda x: x * 2),
pnq.(aiter()).map(lambda x: x * 2)
])
return await tasks.gather()
main() # => [0]
asyncio.run(main2()) # => [[0], [1], [1], [2, 4], [6, 8]]
pnq.actions.flat
(self, selector=None)シーケンスの各要素をイテラブルに射影し、その結果を1つのシーケンスに平坦化します。
Args:
- self: 変換対象のシーケンス
- selector: 各要素から平坦化する要素を選択する関数
Usage:
>>> pnq.query(["abc", "def"]).flat().to(list)
>>> ["a", "b", "c", "d", "e", "f"]
>>> countries = [{"country": "japan", "state": ["tokyo", "osaka"]}, {"country": "america", "state": ["new york", "florida"]}]
>>> pnq.query(countries).flat(lambda x: x["state"]).to(list)
>>> ["tokyo", "osaka", "new york", "florida"]
pnq.actions.traverse
(self, selector)シーケンスの各要素から再帰的に複数ノードを選択し、選択されたノードを1つのシーケンスに平坦化します。 各ルート要素から浅い順に列挙されます。
Args:
- self: 変換対象のシーケンス
- selector: 各要素から平坦化する要素を再帰的に選択する関数(戻り値はリスト等に含めて返す必要があります)
Usage:
>>> pnq.query(
>>> {"name": "a", "nodes": [{"name": "b", nodes: [{"name": c, "nodes": []}, {"name": "d", "nodes": []}}}]}]}
>>> ).traverse(lambdax x: x["nodes"]).select("name").to(list)
>>> ["a", "b", "c", "d"]
pnq.actions.pivot_unstack
(self, default=None)行方向に並んでいるデータを列方向に入れ替える
Args:
- self: 変換対象のシーケンス
- default: フィールドが存在しない場合のデフォルト値
Usage:
data = [
{"name": "test1", "age": 20},
{"name": "test2", "age": 25},
{"name": "test3", "age": 30, "sex": "male"},
]
{'name': ['test1', 'test2', 'test3'], 'age': [20, 25, 30], 'sex': [None, None, 'male']}
pnq.actions.pivot_stack
(self)列方向に並んでいるデータを行方向に入れ替える
Args:
- self: 変換対象のシーケンス
Usage:
{'name': ['test1', 'test2', 'test3'], 'age': [20, 25, 30], 'sex': [None, None, 'male']}
data = [
{"name": "test1", "age": 20, "sex": None},
{"name": "test2", "age": 25, "sex": None},
{"name": "test3", "age": 30, "sex": "male"},
]
pnq.actions.cast
(self, type)シーケンスの型注釈を変更します。この関数はエディタの型解釈を助けるためだけに存在し、何も処理を行いません。
実際に型を変更する場合は、map
を使用してください。
Args:
- self: 変換対象のシーケンス
- type: 新しい型注釈
Usage:
>>> pnq.query([1]).cast(float)
pnq.actions.enumerate
(self, start=0, step=1)シーケンスの各要素とインデックスを新しいフォームに射影します。
Args:
- self: 変換対象のシーケンス
- start: 開始インデックス
- step: 増分
Usage:
>>> pnq.query([1, 2]).enumerate().to(list)
[(0, 1), (1, 2)]
>>> pnq.query([1, 2]).enumerate(5).to(list)
[(5, 1), (6, 2)]
>>> pnq.query([1, 2]).enumerate(0, 10)).to(list)
[(0, 1), (10, 2)]
pnq.actions.group_by
(self, selector=シーケンスの各要素からセレクタ関数でキーとバリューを取得し、キーでグループ化されたシーケンスを生成します。 セレクタ関数を指定しない場合、各要素がすでにキーバリューのタプルであることを期待し、キーでグループ化します。
Args:
- self: 変換対象のシーケンス
- selector: キーとバリューを選択する関数
Usage:
>>> data = [
>>> {"name": "banana", "color": "yellow", "count": 3},
>>> {"name": "apple", "color": "red", "count": 2},
>>> {"name": "strawberry", "color": "red", "count": 5},
>>> ]
>>> pnq.query(data).group_by(lambda x: x["color"], x["name"]).to(list)
[("yellow", ["banana"]), ("red", ["apple", "strawberry"])]
>>> pnq.query(data).select("color", "count").group_by().to(dict)
{"yellow": [3], "red": [2, 5]}
pnq.actions.join
(self, right, on, select)pnq.actions.request
(self, func, executor=None, *, unpack='', chunksize=1)シーケンスの要素を任意の関数で並列処理し、新しいフォームに射影します。
戻り値または例外は、asyncio.Future
互換のResponse
オブジェクトに格納されます。
詳しくは並列処理を参照ください。
Args:
- func: 実行する任意の関数
- executor: 処理を実行するエクゼキュータ
- unpack: 引数をどのように展開するか指定する
- chunksize: cpuバウンドなエクゼキュータで有効です。指定したサイズの要素を一括処理します。
Usage:
>>> def do_something(id, val):
>>> if val:
>>> return 1
>>> else:
>>> raise ValueError(val)
>>>
>>> for res in pnq.query([{"id": 1, "val": True}, {"id": 2, "val": False}]).request(do_something):
>>> if res.err:
>>> print(f"ERROR: {res.to(dict)}")
>>> else:
>>> print(f"SUCCESS: {res.to(dict)}")
pnq.actions.parallel
(self, func, executor=None, *, unpack='', chunksize=1)シーケンスの要素を任意の関数で並列処理し、新しいフォームに射影します。 例外が発生した場合、後続の要素はスケジューリングされません。 その場合、実行中の処理とキューに積まれた処理の後処理は、エクゼキュータの挙動に依存します。 詳しくは並列処理を参照ください。
Args:
- func: 実行する任意の関数
- executor: 処理を実行するエクゼキュータ
- unpack: 引数をどのように展開するか指定する
- chunksize: cpuバウンドなエクゼキュータで有効です。指定したサイズの要素を一括処理します。
Filtering
pnq.actions.filter
(self, predicate)述語に基づいてシーケンスの要素をフィルタ処理します。
Args:
- self: フィルタ対象のシーケンス
- predicate: 条件を満たすか検証する関数
Usage:
>>> pnq.query([1, 2]).filter(lambda x: x == 1).to(list)
[1]
>>> pnq.query({1: True, 2: False, 3: True}).filter(lambda x: x[1] == True).to(list)
[(1, True), (3, True)]
pnq.actions.filter_type
(self, *types)指定した型に一致するシーケンスの要素をフィルタ処理します。
型は複数指定することができ、isinstance
の挙動に準じます。
Args:
- self: フィルタ対象のシーケンス
- types: フィルタする型
Usage:
>>> pnq.query([1, False, "a"]).filter_type(int).to(list)
[1, False]
>>> pnq.query([1, False, "a"]).filter_type(str, bool).to(list)
[False, "a"]
pnq.actions.filter_keys
(self, *keys)シーケンスの要素から指定したキーの要素のみフィルタ処理します。
このメソッドは、list
dict
set
などをクエリ化した直後のみ利用できます。
list
、tuple
の場合インデックスでフィルタされ、値を返します。dict
の場合キーでフィルタされ、キーと要素を返します。set
の場合は、キーでフィルタされ、キーを返します。
Args:
- self: フィルタ対象のシーケンス
- keys: フィルタするキー
Usage:
>>> pnq.query([1, 2]).filter_keys(1).to(list)
[2]
>>> pnq.query({"a": 1, "b": 2}).filter_keys("b").to(list)
[("b", 2)]
>>> pnq.query({"a", "b"}).filter_keys("b").to(list)
["b"]
pnq.actions.filter_unique
(self, selector=None)シーケンスの要素から重複する要素を除去する。 セレクタによって選択された値に対して重複が検証され、その値を返す。
Args:
- self: フィルタ対象のシーケンス
- selector: 重複を検証する値(複数の値を検証する場合はタプル)
Usage:
>>> pnq.query([1, 2, 1]).filter_unique().to(list)
[1, 2]
>>> pnq.query([(0 , 0 , 0), (0 , 1 , 1), (0 , 0 , 2)]).unique(lambda x: (x[0], x[1])).to(list)
[(0, 0), (0, 1)]
Validating
pnq.actions.must
(self, predicate, msg='')[deprecated]guard のエイリアスです。
pnq.actions.guard
(self, predicate, msg='')述語に基づいてシーケンスの要素を検証します。 検証に失敗した場合、即時に例外が発生します。
Args:
- self: フィルタ対象のシーケンス
- predicate: 条件を満たすか検証する関数
Usage:
>>> pnq.query([1, 2]).guard(lambda x: x == 1).to(list)
raise ValueError("2")
>>> pnq.query({1: True, 2: False, 3: True}).guard(lambda x: x[1] == True).to(list)
raise ValueError("(2, False)")
pnq.actions.must_type
(self, types)[deprecated]guard_type のエイリアスです。
pnq.actions.guard_type
(self, types)シーケンスの要素が指定した型のいずれかであるか検証します。
検証に失敗した場合、即時に例外が発生します。
型は複数指定することができ、isinstance
の挙動に準じます。
Args:
- self: フィルタ対象のシーケンス
- types: フィルタする型
Usage:
>>> pnq.query([1, 2]).guard_type(str, int).to(list)
raise ValueError("1 is not str")
pnq.actions.must_keys
(self, *keys)filter_keys
を実行し、全てのキーを取得できなかった場合例外を発生させます。
検証が完了するまで、ストリームは保留されます。
pnq.actions.must_unique
(self, selector=None)シーケンスの要素から値を選択し、選択した値が重複していないか検証します。
Args:
- self: フィルタ対象のシーケンス
- selector: 検証する値を選択する関数
- immediate: 即時に例外を発生させる
Usage:
>>> pnq.query([1, 2, 1]).must_unique().to(list)
raise DuplicateError("1")
Partitioning
pnq.actions.take
(self, count_or_range)シーケンスから指定した範囲の要素を返します。
Args:
- self: 取得対象のシーケンス
- count_or_range: シーケンスの先頭から取得する要素数または取得する範囲
Usage:
>>> pnq.query([1, 2, 3]).take(2).to(list)
[1, 2]
>>> pnq.query([1, 2, 3]).take(range(1, 2)).to(list)
[2]
pnq.actions.take_while
(self, predicate)シーケンスの先頭から、条件の検証に失敗するまでの要素を返します。 検証に失敗した要素は破棄されるため、値を消費するイテレータがソースの場合は注意が必要です。
Args:
- self: バイパス対象のシーケンス
- predicate: 条件を検証する関数
Usage:
>>> pnq.query([1, 2, 3]).enumerate().take_while(lambda v: v[0] < 2).select(1).to(list)
[1, 2]
pnq.actions.skip
(self, count_or_range)シーケンスから指定した範囲の要素をバイパスします。
Args:
- self: バイパス対象のシーケンス
- count_or_range: シーケンスの先頭からバイパスする要素数またはバイパスする範囲
Usage:
>>> pnq.query([1, 2, 3]).skip(1).to(list)
[2, 3]
>>> pnq.query([1, 2, 3]).skip(range(1, 2)).to(list)
[1, 3]
pnq.actions.skip_while
(self, predicate)シーケンスの先頭から、条件の検証に失敗するまでの要素をバイパスし、残りの要素を返します。
Args:
- self: バイパス対象のシーケンス
- predicate: 条件を検証する関数
Usage:
>>> pnq.query([1, 2, 3]).enumerate().skip_while(lambda v: v[0] < 1).select(1).to(list)
[2, 3]
pnq.actions.take_page
(self, page, size)シーケンスから指定した範囲の要素を返します。 範囲はページサイズと取得対象のページから求められます。
Args:
- self: バイパス対象のシーケンス
- page: 取得対象のページ(1始まり)
- size: 1ページあたりの要素数
Usage:
>>> pnq.query([0, 1, 2, 3, 4, 5]).take_page(page=1, size=2).to(list)
[0, 1]
>>> pnq.query([0, 1, 2, 3, 4, 5]).take_page(page=2, size=3).to(list)
[3, 4, 5]
pnq.actions.bundle
(self, size)指定した要素数毎に束ねる。
Args:
- self: 断片を含むシーケンス
- size: サイズ
Usage:
>>> pnq.query(["a", "bcd", "", "efghi", "j"]).bundle(2).to(list)
[["a", "bcd"], ["", "efghi"], ["j"]]
pnq.actions.chunk
(self, size)[deprecated]bundle のエイリアスです。
pnq.actions.defrag
(self, size)要素を指定したサイズの断片に整理します。
Args:
- self: 断片を含むシーケンス
- size: サイズ
Usage:
>>> pnq.query(["a", "bcd", "efghi"]).defrag(2).to(list)
["ab", "cd", "ef", "gh", "i"]
pnq.actions.ngram
(self, size)要素を指定したサイズの連続した断片としてを返します。
Args:
- self: 断片を含むシーケンス
- size: サイズ
Usage:
>>> pnq.query(["a", "bcd", "efghi"]).ngram(2).to(list)
["a", "ab", "bc", "de", "ef", "fg", "gh", "hi", "i"]
Sorting
pnq.actions.order_by
(self, key_selector=None, desc=False)シーケンスの要素を昇順でソートします。
Args:
- self: ソート対象のシーケンス
- selector: 要素からキーを抽出する関数。複数のキーを評価する場合は、タプルを返してください。
- desc: 降順で並べる場合はTrue
Usage:
>>> pnq.query([3, 2, 1]]).order_by().to(list)
[1, 2, 3]
>>> pnq.query([1, 2, 3]).order_by(lambda x: -x).to(list)
[3, 2, 1]
>>> pnq.query([1, 2, 3]).order_by(desc=True).to(list)
[3, 2, 1]
>>> pnq.query([(1, 2)), (2, 2), (2, 1)]).order_by(lambda x: (x[0], x[1])).to(list)
[(1, 2)), (2, 1), (2, 2)]
pnq.actions.order_by_fields
(self, *fields, desc=False, attr=False)pnq.actions.order_by_reverse
(self)シーケンスの要素を逆順にします。
Args:
- self: ソート対象のシーケンス
Usage:
>>> pnq.query([1, 2, 3]).order_by_reverse().to(list)
[3, 2, 1]
pnq.actions.order_by_shuffle
(self)シーケンスの要素をランダムにソートします。
Args:
- self: ソート対象のシーケンス
Usage:
>>> pnq.query([1, 2, 3]).order_by_shuffle().to(list)
[1, 3, 2]
>>> pnq.query([1, 2, 3]).order_by_shuffle().to(list)
[3, 1, 2]
Expanding
pnq.actions.union
(self, *iterables)全ての行を集合に含み重複は許可しない
pnq.actions.union_all
(self)全ての行を集合に含む
pnq.actions.union_intersect
(self)共通部分のみ抽出
pnq.actions.union_minus
(self)1つ目の問い合わせには存在するが、2つ目の問い合わせには存在しないデータを抽出 exceptと同じ意味
pnq.actions.zip
(self, *iterables)pnq.actions.compress
(self, *iterables)未実装
Finalizing - Executing
pnq.actions.result
(self)ストリームを評価し、結果をリストとして保存します。 返されたリストは、クエリメソッドが実装されたリストの拡張クラスです。
Args:
- self: 評価するシーケンス
Returns: pnq.list
Usage:
>>> saved = pnq.query([1, 2, 3]).result()
[1, 2, 3]
>>> saved.map(lambda x: x * 2).result()
[2, 4, 6]
pnq.actions.save
(self)result
のエイリアスです。
pnq.actions.to
(self, finalizer)ストリームをファイナライザによって処理します。
Args:
- self: 評価するシーケンス
- finalizer: イテレータを受け取るクラス・関数
Returns: ファイナライザが返す結果
Usage:
>>> pnq.query([1, 2, 3]).to(list)
[1, 2, 3]
>>> pnq.query({1: "a", 2: "b"}).to(dict)
{1: "a", 2: "b"}
pnq.actions.each
(self, func=シーケンスの各要素を指定した関数で逐次的に処理します。
for x in iterable: ...
のショートカットとして機能します。
関数を指定しない場合、単にイテレーションを実行します。
非同期関数を実行するには、クエリを非同期化する必要があります。
Args:
- func: 要素を処理する関数
- unpack: 引数をどのように展開するか指定する
Returns: None
Usage:
>>> pnq.query([1,2]).each()
>>> pnq.query([1,2]).each(print)
1
2
>>> await pnq.query([1,2])._.each(sync_func)
>>> await pnq.query([1,2])._.each(async_func)
pnq.actions.dispatch
(source, func, executor=None, *, unpack='', chunksize=1, on_complete=None)シーケンスの要素を任意の関数で並列処理します。
スケジューリングされた処理はバックグラウンドで実行され、処理結果はon_complete
に指定した関数で受け取れます。
アプリケーションが終了する時、バックグランドの残処理がどのように扱われるかはエクゼキュータの挙動に依存します。
on_complete
が確実に呼び出される保証はありません。
詳しくは並列処理を参照ください。
Args:
- func: 実行する任意の関数
- executor: 処理を実行するエクゼキュータ
- unpack: 引数をどのように展開するか指定する
- chunksize: cpuバウンドなエクゼキュータで有効です。指定したサイズの要素を一括処理します。
- on_complete(x): 処理結果を受け取る関数を指定します
pnq.actions.lazy
(self, finalizer)ファイナライザの実行するレイジーオブジェクトを返します。
レイジーオブジェクトをコールすると同期実行され、await
すると非同期実行されます。
Args:
- self: バイパス対象のシーケンス
- finalizer: イテレータを受け取るクラス・関数
Returns: ファイナライザが返す結果
Usage:
>>> lazy = pnq.query([1, 2, 3]).lazy(list)
>>> lazy()
[1, 2, 3]
>>> await lazy
[1, 2, 3]
>>> lazy = pnq.query([1, 2, 3]).lazy(pnq.actions.first)
>>> await lazy
1
Finalizing - Aggregating
pnq.actions.len
(self)シーケンスの要素数を返します。
Usage:
>>> pnq.query([1, 2, 3]).len()
3
pnq.actions.exists
(self)シーケンス内の要素の有無を確認します。
Usage:
>>> pnq.query([]).exists()
False
>>> pnq.query([1]).exists()
True
pnq.actions.all
(self, selector=シーケンスの全ての要素がTrueと判定できるか評価します。要素がない場合はTrueを返します。
- selector: 要素から検証する値を抽出する関数
Usage:
>>> pnq.query([]).all()
True
>>> pnq.query([0]).all()
False
>>> pnq.query([1]).all()
True
>>> pnq.query([1, 0]).all()
False
>>> pnq.query([1, 2]).all()
True
pnq.actions.any
(self, selector=シーケンスのいずれかの要素がTrueと判定できるか評価します。要素がない場合はFalseを返します。
- selector: 要素から検証する値を抽出する関数
Usage:
>>> pnq.query([]).any()
False
>>> pnq.query([0]).any()
False
>>> pnq.query([1]).any()
True
>>> pnq.query([1, 0]).any()
True
pnq.actions.contains
(self, value, selector=既定の等値比較子を使用して、指定した要素がシーケンスに含まれているか評価します。 辞書をソースとした場合は、キーバリューのタプルを比較します。
- value: 検索対象の値
- selector: 要素から検索する値を抽出する関数
Usage:
>>> fruits = pnq.query(["apple", "orange"])
>>> fruits.contains("banana")
False
>>> fruits.contains("apple")
True
>>> fruits.contains("orange")
True
>>> pnq.query({"a": 1, "b": 2}).contains("a")
False
>>> pnq.query({"a": 1, "b": 2}).contains(("a", 1))
True
pnq.actions.min
(self, selector=シーケンスの要素から最小の値を取得します。
Args:
- selector: 要素から計算する値を抽出する関数
- default: 要素が存在しない場合に返す値
Usage:
>>> pnq.query([1, 2]).min()
1
>>> pnq.query([]).min()
ValueError: min() arg is an empty sequence
>>> pnq.query([]).min(default=0)
0
pnq.actions.max
(self, selector=シーケンスの要素から最大の値を取得します。
Args:
- selector: 要素から計算する値を抽出する関数
- default: 要素が存在しない場合に返す値
Usage:
>>> pnq.query([1, 2]).max()
2
>>> pnq.query([]).max()
ValueError: max() arg is an empty sequence
>>> pnq.query([]).max(default=0)
0
pnq.actions.sum
(self, selector=シーケンスの要素を合計します。
- selector: 要素から計算する値を抽出する関数
Usage:
>>> pnq.query([]).sum()
0
>>> pnq.query([1, 2]).sum()
3
pnq.actions.average
(self, selector=シーケンスの要素の平均を求めます。
Args:
- selector: 要素から計算する値を抽出する関数
- exp: 丸める小数点以下の桁数
- round: 丸め方式
Usage:
>>> pnq.query([]).average()
0
>>> pnq.query([1, 2]).average()
1.5
pnq.actions.reduce
(self, seed, op='+=', selector=シーケンスの要素を指定した代入演算子でシードに合成し、合成結果を返す。
Args:
- seed: 合成対象とする初期値(左辺)
- op: 代入演算子または2項演算関数
- selector: 要素から結合する値を抽出する関数(右辺)
Usage:
>>> pnq.query([1]).reduce(10, "+=")
11
>>> pnq.query([[1, 2, 3], [4, 5, 6]]).reduce([], "+=")
[1, 2, 3, 4, 5, 6]
>>> pnq.query([{"a": 1}, {"b": 2}]).reduce({}, "|=") # python3.9~
{"a": 1, "b": 2}
>>> pnq.query([1, 2, 3, 4, 5]).reduce(0, "+=", lambda x: x * 10)
150
>>> pnq.query([1, 2, 3, 4, 5]).reduce(0, lambda l, r: l + r, lambda x: x * 10)
150
pnq.actions.concat
(self, selector=シーケンスの要素を文字列として連結します。 Noneは空文字として扱われます。
Args:
- selector: 要素から結合する値を抽出する関数
- delimiter: 区切り文字
Usage:
>>> pnq.query([]).concat()
""
>>> pnq.query([1, 2]).concat()
"12"
>>> pnq.query(["a", "b"]).concat()
"ab"
>>> pnq.query(["a", None]).concat()
"a"
>>> pnq.query(["a", "b"]).concat(delimiter=",")
"a,b"
Finalizing - Getting
pnq.actions.get
(self, key, default=typing.NoReturn)リストや辞書などのgetitem
を呼び出します。セットでも使用でき、キーが存在する場合そのままキーを返します。
デフォルトを設定した場合、キーが存在しない場合はデフォルトを返します。
このメソッドは実体化している状態でのみ使用することができ、クエリ化されている状態では使用できません。
Args:
- key: キー
- default: キーが存在しない場合に返すデフォルト値
Usage:
>>> data = pnq.query({"a", "b", "c"})
>>> data.get("a")
"a"
>>> data.get("d")
raise KeyError("d")
>>> data.get("d", 10)
10
pnq.actions.one
(self)シーケンス内の要素が1つであることを検証し、その要素を返します。 検証に失敗した場合は、例外が発生します。 デフォルト値を設定した場合は、要素が存在しない場合にデフォルト値を返します。
one
関数は、1つの要素であるか検証するために2つ目の要素を取り出そうとします。
ソースとなるイテラブルが値を消費する実装だと、2つの要素が失われる可能性があることに注意してください。
Args:
- default: 要素が存在しない場合に返す値
Usage:
>>> pnq.query([]).one()
raise NoElementError("...")
>>> pnq.query([1]).one()
1
>>> pnq.query([1, 2]).one()
raise NotOneElementError("...")
>>> pnq.query([]).one(None)
None
>>> pnq.query([1, 2]).one(None)
raise NotOneElementError("...")
pnq.actions.first
(self)シーケンス内の最初の要素を返します。 要素が存在しない場合は、例外が発生します。
セットをソースとした場合、セットは順序を保持しないため、順序性は期待できません。
Args:
- default: 要素が存在しない場合に返す値
Usage:
>>> pnq.query([]).first()
raise NoElementError("...")
>>> pnq.query([1]).first()
1
>>> pnq.query([1, 2]).first()
1
>>> pnq.query([]).first(None)
None
pnq.actions.last
(self)シーケンス内の最後の要素を返します。 要素が存在しない場合は、例外が発生します。
セットをソースとした場合、セットは順序を保持しないため、順序性は期待できません。
Args:
- default: 要素が存在しない場合に返す値
Usage:
>>> pnq.query([]).last()
raise NoElementError("...")
>>> pnq.query([1]).last()
1
>>> pnq.query([1, 2]).last()
2
>>> pnq.query([]).last(None)
None
pnq.actions.get_or
(self, key, default)pnq.actions.one_or
(self, default)pnq.actions.first_or
(self, default)pnq.actions.last_or
(self, default)pnq.actions.get_or_raise
(self, key, exc)基本的な動作はget
を参照ください。
KeyErrorが発生した時、任意の例外を発生させます。
Args:
- exc: KeyError時に発生させる例外
Usage:
>>> pnq.query([]).get_or_raise(0, Exception(f"Not Exist Key: 0"))
raise Exception("Not Exist Key: 0")
pnq.actions.one_or_raise
(self, exc)基本的な動作はone
を参照ください。
NoElementErrorが発生した時、任意の例外を発生させます。
NotOneElementErrorはキャッチしません。
Args:
- exc: NoElementError時に発生させる例外
Usage:
>>> pnq.query([]).one_or_raise(0, Exception("No exists."))
raise Exception("No exists.")
pnq.actions.first_or_raise
(self, exc)基本的な動作はfirst
を参照ください。
NoElementErrorが発生した時、任意の例外を発生させます。
Args:
- exc: NoElementError時に発生させる例外
Usage:
>>> pnq.query([]).first_or_raise(0, Exception("No exists."))
raise Exception("No exists.")
pnq.actions.last_or_raise
(self, exc)基本的な動作はlast
を参照ください。
NoElementErrorが発生した時、任意の例外を発生させます。
Args:
- exc: NoElementError時に発生させる例外
Usage:
>>> pnq.query([]).last_or_raise(0, Exception("No exists."))
raise Exception("No exists.")
クラス
pnq._itertools.requests.Response
(func, args, kwargs, err, res, start, end)Response(func, args, kwargs, err, res, start, end)
pnq._itertools.requests.StopWatch
(name='')コンテキスト内の処理時間を計測します。
Args:
- name: 任意の名前を付与できます
Members:
- name: 初期化時に付与した名前
- start: コンテキストの開始時間(UTC)
- end: コンテキストの完了時間(UTC)
- elapsed: 開始時間と完了時間の差分秒数
Usage:
>>> with StopWatch("test") as result:
>>> [x for x in range(10000)]
>>> print(result)
{'name': 'test', 'start': '2021-09-13T14:10:04.780085+00:00', 'end': '2021-09-13T14:10:11.907716+00:00', 'elapsed': 7.127631}
to_dict
(self)計測データを辞書化します。日付データはisoformatで出力されます
データソース
一般的に利用頻度が高いデータソースに対するクエリのショートカットを提供します。
filesystem
ファイルやディレクトリに関するクエリを提供します。
pnq.ds.filesystem.ls
(pathname='*', *, root_dir=None, dir_fd=None, recursive=False)pnq.ds.filesystem.files
(pathname='*', *, root_dir=None, dir_fd=None, recursive=False)pnq.ds.filesystem.dirs
(pathname='*', *, root_dir=None, dir_fd=None, recursive=False)schedule
計画した間隔でイベント(datetime.datetime(UTC)
)を提供し続けます。
無限イテレータのため、必要に応じてtake
等で終了条件を定めてください。
CPUの実行状況により、時間通りにイベントが送信されるとは限りません。 多少遅延することを想定してください。
pnq.ds.schedule.tick
(seconds, token=None)pnq.ds.schedule.tick_async
(seconds, token=None)例外クラス
Exception classes
pnq.exceptions.PnqException
(msg='')Pnqに関連する全ての例外の基底クラス
pnq.exceptions.NotFoundError
(key)クエリがキーに対応する要素を要求したが存在しない。 IndexErrorとKeyErrorはKeyNotFoundErrorに置き換わります
関連: get
must_get_many
pnq.exceptions.NoElementError
(msg='')クエリが何らかの要素を要求したが要素が存在しない
関連: one
first
last
pnq.exceptions.NotOneElementError
(msg='')クエリが要素がひとつであることを要求したが複数の要素が存在した
関連: one
pnq.exceptions.DuplicateElementError
(msg='')クエリが要素が重複していないことを要求したが重複を検知した
関連: must_unique
高度な使用方法
並列処理
pnq
は任意のエクゼキュータでの並列処理をサポートし、効率的に計算資源を活用できます。
並列処理に対応しているメソッドは次の通りです。
- parallel(結果のみを返す)
- request(成功・失敗情報を含む結果を返す)
- dispatch(処理を投げっぱなしにする)
また、gather
を使うと複数のクエリを並列実行し、非同期に完了待機できます。
gather
はコルーチンなどawaitable
なオブジェクトに対応しています。
次の例は、クエリを並列実行し、実行中のクエリの完了を待たずに次のクエリを順次実行する例です。 ただし、むやみな並列処理はデッドロックやメモリ圧迫などの問題を引き起こすので控えてください。
import asyncio
import pnq
from pnq.concurrent import ProcessPool, ThreadPool, AsyncPool
def mul(x):
return x * 2
async def mul_async(x):
return x * 2
async def aiter():
yield 1
yield 2
yield 3
async def notify(x):
await asyncio.sleep(0.1)
print(x)
async def main():
async with ProcessPool(2) as proc, ThreadPool(2) as thread, AsyncPool(2) as aio:
tasks = pnq.query([
pnq.query([1, 2, 3]).parallel(mul, proc),
pnq.query([1, 2, 3]).parallel(mul_async, proc),
pnq.query(aiter()).parallel(mul, proc),
pnq.query(aiter()).parallel(mul_async, proc),
pnq.query([1, 2, 3]).parallel(mul, thread),
pnq.query([1, 2, 3]).parallel(mul_async, thread),
pnq.query(aiter()).parallel(mul, thread),
pnq.query(aiter()).parallel(mul_async, thread),
pnq.query([1, 2, 3]).parallel(mul, aio),
pnq.query([1, 2, 3]).parallel(mul_async, aio),
pnq.query(aiter()).parallel(mul, aio),
pnq.query(aiter()).parallel(mul_async, aio),
])
await tasks.gather().flat().dispatch(notify, aio)
asyncio.run(main())
pnq
が提供するエクゼキュータは次の通りです。
ProcessPool
CPUバウンドな重たい処理に向いています。GILの制限を受けません。
チャンクサイズを指定すると、一括処理を効率化できます。
チャンクサイズはProcessPool
でのみ有効で、それ以外のエクゼキューターでは無視されます。
- 同期関数と非同期関数はプロセスプール上で実行されます。
ThreadPool
I/Oバウンドなプリエンプティブマルチタスク(time.sleepなど)に向いています。
- 同期関数と非同期関数はスレッドプール上で実行されます。
AsyncPool
I/Oバウンドなノンエンプティブマルチタスク(asyncio.sleepなど)に向き、シングルスレッドを効率的に利用します。 同期関数はスレッドプール上で実行されるため、ThreadPoolの代用としても働きます。
- 同期関数はスレッドプール上で実行されます。
- 非同期関数はイベントループ上(asyncio)で実行されます。
クエリは非同期実行のみ許可されます。
DummyPool
エクゼキュータを指定しない場合に使用されます。
- 同期実行時は並列化されず、単に現在のスレッドで処理を即時実行します。
- 非同期実行時は、
AsyncPool
のように振る舞います。 - 同時実行数は、指定した同時実行数+1(同期実行は即時処理)となります。
コンテキストが存在せず、投げっぱなしにされた処理の実行は保証されないため、 基本的には別のエクゼキュータを指定するようにしてください。
非同期処理
実行
次の場合、クエリは非同期の文脈で実行する必要があります。
- 非同期イテレータをソースとした場合
request
による並列処理を非同期に待ち受けたい場合
import asyncio
import pnq
async def async_iterate():
yield 1
yield 2
yield 3
async def sleep(x):
await asyncio.sleep(1)
print(x)
async def main():
async for x in pnq.query(async_iterate()):
print(x)
async for x in pnq.query([dict(x=1), dict(x=2), dict(x=3)]).request(sleep):
print(x)
asyncio.run(main())
キャンセル管理
pnq
は簡単なキャンセル機構を提供し、これを利用できます。
pnq.run
に、非同期関数を渡すと、その関数を起動し、第一引数にキャンセルトークンを渡します。
第一引数を受け入れ可能な場合、その関数はキャンセルコントロールの意思があるとみなされます。
次のコードは、10秒間待機している間キャンセル(SIGTERMとSIGINT)を受け入れません。
import asyncio
import pnq
async def main(token):
await asyncio.sleep(10)
print("Hello, world!")
pnq.run(main)
キャンセルを検知するとtoken.is_running
はFalse
を返すようになります。
token.is_running
を監視することで、任意のタイミングで処理を中断できます。
次のコードは、token.is_running
がFalse
と評価されるまで、要素を流し続けます。
import asyncio
import pnq
async def main(token):
async def infinity(token):
while token.is_running:
yield 1
await asyncio.sleep(1)
async for x in pnq.query(infinity(token)):
print(x)
pnq.run(main)
関数が第一引数を受け入れ可能でない場合、その関数はキャンセルコントロールの意思がないとみなされます。
次のコードは、キャンセルを検知すると実行は即時キャンセルされます。
import asyncio
import pnq
async def main():
async def infinity():
while True:
yield 1
await asyncio.sleep(1)
async for x in pnq.query(infinity()):
print(x)
pnq.run(main)
例外処理
pnq
は基本的に例外をキャッチしませんが、次のクエリのみ関数実行時の例外をキャッチします。
request
これらのクエリは、実行結果を含んだオブジェクトResponse
を返し、
Response
のerr
がNone
でない場合、処理は失敗したとみなせます。
結果はresult
から取得できます。
処理が失敗している場合は例外が返ります。
スタックトレースは次のように取得できます。
import traceback
async def raise_error(x):
raise Exception(str(x))
for res in pnq.query([{"x": 1}].request(raise_error)):
if res.err:
msg = "".join(
traceback.format_exception(etype=type(err), value=err, tb=err.__traceback__)
)
print(msg)
else:
print(res.result())
実装例
非同期なリクエストを含むクエリの実装例です。
import asyncio
import pnq
import httpx
async def main():
async with httpx.AsyncClient() as client:
params = pnq.query([
{"url": "test_url_1"},
{"url": "test_url_2"},
])
async def fetch_from_url(url):
res = await client.get(url)
res.raise_for_status()
return res
@params.request(fetch_from_url, unpack="**").group_by
def split_success_and_error(res):
return (not res.err, res)
return dict(await split_success_and_error)
result = asyncio.run(main())
# {
# True: [res1, res2, ...],
# False: [res3, res4, ...],
# }
性能評価
性能評価
pnqのイテレーション性能
内包表記と比較するとpnq
は1.36程度性能が落ちます。
日常的に大量のデータを処理する場合は、ネイティブな記法に書き直すことも検討ください。
import pnq
from pnq.base.requests import StopWatch
from decimal import Decimal, ROUND_HALF_UP
RANGE = 100000000
def dummy(x):
return x
with StopWatch("内包表記") as result_1:
list(dummy(x) for x in range(RANGE) if x % 2)
with StopWatch("イテレータ") as result_2:
def iterate():
for i in range(RANGE):
if i % 2:
yield dummy(i)
list(iterate())
with StopWatch("pnq") as result_3:
pnq.query(range(RANGE)).filter(lambda x: x % 2).map(dummy).to_list()
difference = Decimal(f"{result_1.elapsed}") - Decimal(f"{result_3.elapsed}")
rate = Decimal(f"{result_3.elapsed}") / Decimal(f"{result_1.elapsed}")
rate = rate.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
print(f"{result_1}")
print(f"{result_2}")
print(f"{result_3}")
print(f"内包表記:pnq 性能差割 :{rate}")
{'name': '内包表記', 'start': '2021-09-20T12:42:35.219151+00:00', 'end': '2021-09-20T12:42:42.270021+00:00', 'elapsed': 7.05087}
{'name': 'イテレータ', 'start': '2021-09-20T12:42:42.270046+00:00', 'end': '2021-09-20T12:42:49.383548+00:00', 'elapsed': 7.113502}
{'name': 'pnq', 'start': '2021-09-20T12:42:49.383573+00:00', 'end': '2021-09-20T12:42:58.979269+00:00', 'elapsed': 9.595696}
内包表記:pnq 性能差割 :1.36
非同期イテレータの性能
非同期イテレータは同期イテレータより性能が2.25倍程度遅いです。
非同期イテレータは、ネットワークI/OやファイルI/Oなどの待機時間で、 並列処理できるケースで有効です。
特に理由がない場合は、同期イテレータを積極的に使うようにしてください。
import asyncio
from decimal import Decimal, ROUND_HALF_UP
from pnq.models import StopWatch
class Range:
def __init__(self, count):
self.count = count
def __iter__(self):
for i in range(self.count):
yield i
async def __aiter__(self):
for i in range(self.count):
yield i
calculator = Range(100000000)
async def main():
with StopWatch() as result_1:
for i in calculator:
pass
with StopWatch() as result_2:
async for i in calculator:
pass
difference = Decimal(f"{result_1.elapsed}") - Decimal(f"{result_2.elapsed}")
rate = Decimal(f"{result_2.elapsed}") / Decimal(f"{result_1.elapsed}")
rate = rate.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
print(f"同期 :{result_1}")
print(f"非同期 :{result_2}")
print(f"性能差割合 :{rate}")
asyncio.run(main())
同期 :{'start': '2021-09-13T10:28:55.240113+00:00', 'end': '2021-09-13T10:28:58.890342+00:00', 'elapsed': 3.650229}
非同期 :{'start': '2021-09-13T10:28:58.890577+00:00', 'end': '2021-09-13T10:29:07.085747+00:00', 'elapsed': 8.19517}
性能劣化率 :2.25