Views: 0
構文結合表CSVでつくる構造化CSVとJP PINT UBLの相互変換
2026-06-25
1. はじめに
前回の 記事 では,日本版コアインボイスを構造化CSVとして表現し,JP PINT UBLとの対応を SemanticPath と XPath の構文結合表として整理した。
本稿では,その構文結合表を使って,構造化CSVとJP PINT UBL XMLを相互変換するPythonプログラムの考え方を紹介する。
ここで示すプログラムは,JP PINT UBLの完全な実装ではない。目的は,次の点をイメージとしてつかむことである。
-
CSV側の項目を SemanticPath で指定する。
-
XML側の項目を XPath で指定する。
-
構文結合表を
bindings.csvとして外部化する。 -
プログラム本体は,構文結合表を読み取り,CSVからXMLを生成する。
-
逆方向では,XMLからXPathで値を取得し,構造化CSVの行へ展開する。
-
対応先の構文が変わっても,原則として結合定義CSVを変更すれば,同じ汎用プログラムを利用できる。
つまり,変換ロジックをプログラム内に直接書き込むのではなく,構文結合表として外部化することが重要である。
2. ファイル構成
今回のサンプル syntax_binding_revised_package.zip は,次の3ファイルを使う。
| ファイル | 役割 |
|---|---|
|
|
日本版コアインボイスを表す構造化CSV。 |
|
|
構造化CSV側の SemanticPath と,JP PINT UBL側の XPath を対応付ける構文結合表。 |
|
|
構造化CSVと構文結合表CSVを読み込み,JP PINT UBL XMLを生成し,さらにXMLから構造化CSVへ戻すサンプルプログラム。 |
実行例は次のとおりである。
python3 syntax_binding.py \
--input-csv invoice.csv \
--bindings-csv bindings.csv \
--xml-out invoice.xml \
--roundtrip-csv roundtrip.csv \
--print-xml
この実行により,次のファイルが生成される。
| 出力ファイル | 内容 |
|---|---|
|
|
構造化CSVから生成したJP PINT UBL subset XML。 |
|
|
生成したXMLを読み戻して再構成した構造化CSV。 |
3. 構造化CSVと構文結合表
3.1. 構造化CSVの例
本稿では,請求書構造化CSVを例にする。
dInvoice,dInvoiceParty,dDocumentReference,dPayment,dTaxBreakdown,dInvoiceLine,invoiceNumber,issueDate,invoiceTypeCode,currencyCode,totalAmount,partyRole,partyName,partyTaxID,referenceType,referenceID,paymentDueDate,paymentMeansCode,paymentReference,taxCategoryCode,taxRate,taxableAmount,taxAmount,lineID,itemName,lineTaxCategoryCode,lineTaxRate,invoicedQuantity,unitCode,unitPriceAmount,lineExtensionAmount
1,,,,,,INV-26-861,2026-06-20,380,JPY,16802,,,,,,,,,,,,,,,,,,,,
1,1,,,,,,,,,,Seller,売手株式会社,T1234567890123,,,,,,,,,,,,,,,,,
1,2,,,,,,,,,,Buyer,買手株式会社,,,,,,,,,,,,,,,,,,
1,,1,,,,,,,,,,,,Order,PO-2026-0001,,,,,,,,,,,,,,,
1,,2,,,,,,,,,,,,Delivery,DN-2026-0001,,,,,,,,,,,,,,,
1,,,1,,,,,,,,,,,,,2026-07-31,30,INV-2026-0001,,,,,,,,,,,,
1,,,,1,,,,,,,,,,,,,,,S,10,12320,1232,,,,,,,,
1,,,,1,,,,,,,,,,,,,,,AA,8,3010,240,,,,,,,,
1,,,,,1,,,,,,,,,,,,,,,,,,INV-26-861-1,商品A,S,10,10,EA,1000,10000
1,,,,,2,,,,,,,,,,,,,,,,,,INV-26-861-2,商品B,S,10,4,EA,580,2320
1,,,,,3,,,,,,,,,,,,,,,,,,INV-26-861-3,食品C,AA,8,7,EA,430,3010
このCSVでは,請求書ヘッダー,売手,買手,参照文書,支払情報,税率別内訳,請求明細を,1枚のCSVに格納している。
重要なのは,dInvoiceParty,dDocumentReference,dPayment,dTaxBreakdown,dInvoiceLine などの列である。これらは,行がどの階層又は繰返し構造に属するかを示すディメンション列である。
例えば,次の行は売手を表す。
1,1,,,,,,,,,,Seller,売手株式会社,T1234567890123,,,,,,,,,,,,,,,,,
この行では,dInvoiceParty に 1 が入り,partyRole に Seller が入っている。したがって,この行は「請求書当事者のうち,売手を表す行」と解釈できる。
3.2. 構文結合表CSV
構文結合表は,構造化CSV側の SemanticPath と,JP PINT UBL側の XPath を対応付ける表である。
今回の改訂では,この対応表をPythonコード中の BINDINGS リストではなく,次の bindings.csv として外部化した。
semantic_path,xpath
$.Invoice.invoiceNumber,/Invoice/cbc:ID
$.Invoice.issueDate,/Invoice/cbc:IssueDate
$.Invoice.invoiceTypeCode,/Invoice/cbc:InvoiceTypeCode
$.Invoice.currencyCode,/Invoice/cbc:DocumentCurrencyCode
$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyName,/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyName/cbc:Name
$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyTaxID,/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyTaxScheme/cbc:CompanyID
$.Invoice.dInvoiceParty[?@.partyRole=Buyer].partyName,/Invoice/cac:AccountingCustomerParty/cac:Party/cac:PartyName/cbc:Name
$.Invoice.dInvoiceParty[?@.partyRole=Buyer].partyTaxID,/Invoice/cac:AccountingCustomerParty/cac:Party/cac:PartyTaxScheme/cbc:CompanyID
$.Invoice.dDocumentReference[?@.referenceType=Order].referenceID,/Invoice/cac:OrderReference/cbc:ID
$.Invoice.dDocumentReference[?@.referenceType=Delivery].referenceID,/Invoice/cac:DespatchDocumentReference/cbc:ID
$.Invoice.paymentDueDate,/Invoice/cbc:DueDate
$.Invoice.paymentReference,/Invoice/cac:PaymentMeans/cbc:PaymentID
例えば,売手名称は次のように指定する。
$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyName
これは,次の意味である。
dInvoiceParty の行のうち, partyRole が Seller である行を選択し, その行の partyName を取得する。
その値を,次のXPathで指定されるXML要素へ設定する。
/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyName/cbc:Name
プログラムは,CSV列名とXML要素名の対応を直接知っているのではない。対応関係を bindings.csv から読み込み,SemanticPath と XPath を使って処理している。
4. Pythonプログラムの全体構成
サンプルプログラムは,大きく次の処理に分けられる。
| 処理 | 役割 |
|---|---|
|
CSV読込み |
|
|
構文結合表読込み |
|
|
SemanticPath評価 |
CSV行リストから,指定された SemanticPath に対応する値を取得する。 |
|
XML生成 |
XPathに従ってXML要素を作成し,取得した値を設定する。 |
|
XML読込み |
ElementTreeでXMLを読み込み,XPath相当のパスで値を取得する。 |
|
CSV生成 |
XMLから取得した値を,構造化CSVの行として再構成する。 |
本稿では,Python標準ライブラリの csv と xml.etree.ElementTree を使用する。
5. CSVファイルを読み込む
構造化CSVは,Pythonの標準ライブラリ csv で読み込む。
import argparse
import csv
import io
import re
import sys
import xml.etree.ElementTree as ET
from decimal import Decimal
from pathlib import Path
def read_core_csv(text: str) -> list[dict[str, str]]:
"""CSV文字列を辞書行のリストとして読み込む。"""
return list(csv.DictReader(io.StringIO(text)))
def read_core_csv_file(path: str | Path, encoding: str = "utf-8-sig") -> list[dict[str, str]]:
"""構造化CSVファイルを読み込む。Excel由来のBOMを考慮し,既定をutf-8-sigにする。"""
return read_core_csv(Path(path).read_text(encoding=encoding))
read_core_csv_file() は,invoice.csv を読み込み,1行を1つの辞書として扱えるようにする。
[
{
"dInvoice": "1",
"invoiceNumber": "INV-26-861",
"issueDate": "2026-06-20",
},
{
"dInvoice": "1",
"dInvoiceParty": "1",
"partyRole": "Seller",
"partyName": "売手株式会社",
"partyTaxID": "T1234567890123",
},
]
1行が1つの辞書になり,CSVの列名をキーとして値を取得できる。
6. 構文結合表CSVを読み込む
bindings.csv は,semantic_path と xpath の2列を持つCSVである。
def read_bindings_csv(text: str) -> list[Binding]:
"""semantic_path,xpath の2列を持つ構文結合表CSVを読み込む。"""
bindings: list[Binding] = []
reader = csv.DictReader(io.StringIO(text))
if reader.fieldnames is None:
raise ValueError("Binding CSV has no header")
# UTF-8 BOM対策。
fieldnames = [name.lstrip("\ufeff") for name in reader.fieldnames]
semantic_col = "semantic_path"
xpath_col = "xpath"
if semantic_col not in fieldnames or xpath_col not in fieldnames:
raise ValueError("Binding CSV must have semantic_path and xpath columns")
for line_no, row in enumerate(reader, start=2):
normalized = {k.lstrip("\ufeff"): (v or "").strip() for k, v in row.items()}
semantic_path = normalized.get(semantic_col, "")
xpath = normalized.get(xpath_col, "")
if not semantic_path and not xpath:
continue
if not semantic_path or not xpath:
raise ValueError(f"Incomplete binding at line {line_no}: {row!r}")
if not xpath.startswith("/Invoice/") and xpath != "/Invoice":
raise ValueError(f"XPath must start with /Invoice at line {line_no}: {xpath!r}")
bindings.append((semantic_path, xpath))
return bindings
def read_bindings_csv_file(path: str | Path, encoding: str = "utf-8-sig") -> list[Binding]:
return read_bindings_csv(Path(path).read_text(encoding=encoding))
この関数により,構文結合表は次のようなタプルのリストになる。
[
("$.Invoice.invoiceNumber", "/Invoice/cbc:ID"),
("$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyName",
"/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyName/cbc:Name"),
]
ここで重要なのは,対応関係がプログラム定数ではなく,外部ファイルとして差し替え可能になっていることである。
7. SemanticPathで値を取得する
本稿のサンプルでは,次の2種類のSemanticPathだけを扱う。
$.Invoice.field
$.Invoice.dimension[?@.filterField=filterValue].field
例えば,請求書番号は次の形式である。
$.Invoice.invoiceNumber
売手名称は次の形式である。
$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyName
これを処理する関数は,次のとおりである。
def rows_by_dimension(
rows: list[dict[str, str]],
dimension: str,
d_invoice: str = "1",
) -> list[dict[str, str]]:
return [r for r in rows if r.get("dInvoice") == d_invoice and r.get(dimension)]
def first_value(
rows: list[dict[str, str]],
column: str,
d_invoice: str = "1",
) -> str:
for row in rows:
if row.get("dInvoice") == d_invoice and row.get(column):
return row[column]
return ""
def query_semantic_path(
rows: list[dict[str, str]],
path: str,
d_invoice: str = "1",
) -> str:
"""
このサンプルで扱う SemanticPath subset:
$.Invoice.field
$.Invoice.dimension[?@.filterField="filterValue"].field
$.Invoice.dimension[?@.filterField=filterValue].field
"""
m = re.fullmatch(r"\$\.Invoice\.([A-Za-z_][A-Za-z0-9_]*)", path)
if m:
return first_value(rows, m.group(1), d_invoice)
m = re.fullmatch(
r"\$\.Invoice\.([A-Za-z_][A-Za-z0-9_]*)"
r"\[\?@\.([A-Za-z_][A-Za-z0-9_]*)=(?:\"([^\"]+)\"|([^\]]+))\]"
r"\.([A-Za-z_][A-Za-z0-9_]*)",
path,
)
if m:
dimension, filter_field, quoted_value, bare_value, target_field = m.groups()
filter_value = quoted_value if quoted_value is not None else bare_value
for row in rows_by_dimension(rows, dimension, d_invoice):
if row.get(filter_field) == filter_value:
return row.get(target_field, "")
return ""
raise ValueError(f"Unsupported SemanticPath: {path}")
この関数は,汎用XPathや汎用JSONPathを実装しているわけではない。構文結合表で必要となる最小限の SemanticPath subset を実装している。
実務上も,最初からすべての式を許容するより,構文結合表で使用できる記法を限定した方が安全である。
8. ElementTreeでXMLを扱う
JP PINT UBLはXML名前空間を使用する。ElementTreeで名前空間付きXMLを扱うため,まず名前空間を定義する。
UBL_NS = "urn:oasis:names:specification:ubl:schema:xsd:Invoice-2"
CBC_NS = "urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2"
CAC_NS = "urn:oasis:names:specification:ubl:schema:xsd:CommonAggregateComponents-2"
NS = {
"": UBL_NS,
"cbc": CBC_NS,
"cac": CAC_NS,
}
for prefix, uri in NS.items():
ET.register_namespace(prefix, uri)
def qname(name: str) -> str:
if ":" in name:
prefix, local = name.split(":", 1)
return f"{{{NS[prefix]}}}{local}"
return f"{{{UBL_NS}}}{name}"
ElementTreeでは,名前空間付き要素を次の形式で指定する。
{名前空間URI}ローカル名
例えば,cbc:ID は内部的には次の名前になる。
{urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2}ID
毎回この文字列を書くのは煩雑なので,接頭辞付きの名前からElementTree用の名前へ変換する関数 qname() を用意している。
9. XPathに沿って要素を生成する
構文結合表には,次のようなXPathが定義されている。
/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyName/cbc:Name
このパスに沿って,Invoiceに追加すべき要素を順番に作成する。
def normalize_xpath(xpath: str) -> str:
xpath = xpath.strip()
# 結合表の正規表記では,途中空白を許容しない。
if re.search(r"\s", xpath):
raise ValueError(f"XPath contains whitespace: {xpath!r}")
return xpath
def ensure_path(root: ET.Element, xpath: str) -> ET.Element:
xpath = normalize_xpath(xpath)
parts = [p for p in xpath.split("/") if p]
if not parts or parts[0] != "Invoice":
raise ValueError(f"XPath must start with /Invoice: {xpath}")
elem = root
for part in parts[1:]:
tag = qname(part)
child = elem.find(tag)
if child is None:
child = ET.SubElement(elem, tag)
elem = child
return elem
def set_text(root: ET.Element, xpath: str, value: str | None) -> None:
if value not in (None, ""):
ensure_path(root, xpath).text = str(value)
def get_text(root: ET.Element, xpath: str) -> str:
xpath = normalize_xpath(xpath)
parts = [p for p in xpath.split("/") if p]
if not parts or parts[0] != "Invoice":
raise ValueError(f"XPath must start with /Invoice: {xpath}")
elem = root
for part in parts[1:]:
elem = elem.find(qname(part))
if elem is None:
return ""
return elem.text or ""
ensure_path() は,指定されたXPathの最後の要素を返す。
例えば,次の呼び出しを行う。
elem = ensure_path(
invoice,
"/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyName/cbc:Name"
)
elem.text = "売手株式会社"
これにより,次のようなXML要素が生成される。
<cac:AccountingSupplierParty>
<cac:Party>
<cac:PartyName>
<cbc:Name>売手株式会社</cbc:Name>
</cac:PartyName>
</cac:Party>
</cac:AccountingSupplierParty>
10. 構文結合表を使ってCSVからXMLを作る
CSVからXMLへの変換は,単純項目については次の3段階で処理できる。
1. SemanticPathでCSVから値を取得する。 2. XPathでXML上の設定先を決める。 3. 取得した値をXML要素に設定する。
def csv_to_ubl_xml(
rows: list[dict[str, str]],
bindings: list[Binding],
d_invoice: str = "1",
) -> ET.Element:
invoice = ET.Element(qname("Invoice"))
# 単一項目及び条件付き単一項目は,構文結合表に従って設定する。
for semantic_path, xpath in bindings:
value = query_semantic_path(rows, semantic_path, d_invoice)
set_text(invoice, xpath, value)
currency = query_semantic_path(rows, "$.Invoice.currencyCode", d_invoice)
# 繰返し構造は,行の集合をXMLの繰返し要素へ展開する。
append_tax_total(invoice, rows, currency, d_invoice)
append_invoice_lines(invoice, rows, currency, d_invoice)
return invoice
この処理では,プログラム本体は「売手はこのXML要素に入れる」「買手はこのXML要素に入れる」といった業務固有の対応を知らない。
対応関係は bindings.csv に定義されている。したがって,別のXML構文へ変換したい場合でも,基本的には bindings.csv を差し替えればよい。
11. 繰返し構造の扱い
請求書番号や発行日は,1つの請求書に1回だけ現れる項目である。
一方,税率別内訳や請求明細は繰返し構造である。
| 構造 | CSV側 | XML側 |
|---|---|---|
|
請求書ヘッダー |
|
|
|
当事者 |
|
|
|
税率別内訳 |
|
|
|
請求明細 |
|
|
このため,サンプルプログラムでは,単純項目は bindings.csv で処理し,dTaxBreakdown と dInvoiceLine は繰返し生成規則として処理している。
11.1. 税率別内訳をXMLへ変換する
構造化CSVでは,税率別内訳は dTaxBreakdown に値がある行として表される。
def append_tax_total(
invoice: ET.Element,
rows: list[dict[str, str]],
currency: str,
d_invoice: str = "1",
) -> None:
tax_rows = rows_by_dimension(rows, "dTaxBreakdown", d_invoice)
if not tax_rows:
return
tax_total = ET.SubElement(invoice, qname("cac:TaxTotal"))
total_tax_amount = sum(Decimal(r.get("taxAmount") or "0") for r in tax_rows)
ET.SubElement(
tax_total,
qname("cbc:TaxAmount"),
{"currencyID": currency},
).text = str(total_tax_amount)
for tax_row in tax_rows:
subtotal = ET.SubElement(tax_total, qname("cac:TaxSubtotal"))
ET.SubElement(
subtotal,
qname("cbc:TaxableAmount"),
{"currencyID": currency},
).text = tax_row["taxableAmount"]
ET.SubElement(
subtotal,
qname("cbc:TaxAmount"),
{"currencyID": currency},
).text = tax_row["taxAmount"]
category = ET.SubElement(subtotal, qname("cac:TaxCategory"))
ET.SubElement(category, qname("cbc:ID")).text = tax_row["taxCategoryCode"]
ET.SubElement(category, qname("cbc:Percent")).text = tax_row["taxRate"]
scheme = ET.SubElement(category, qname("cac:TaxScheme"))
ET.SubElement(scheme, qname("cbc:ID")).text = "VAT"
この処理は,次のように考えられる。
dTaxBreakdown の各行を取り出す。 各行について cac:TaxSubtotal を作成する。 taxableAmount を cbc:TaxableAmount へ設定する。 taxAmount を cbc:TaxAmount へ設定する。 taxCategoryCode と taxRate を cac:TaxCategory へ設定する。
11.2. 請求明細をXMLへ変換する
請求明細は,dInvoiceLine に値がある行として表される。
def append_invoice_lines(
invoice: ET.Element,
rows: list[dict[str, str]],
currency: str,
d_invoice: str = "1",
) -> None:
for line_row in rows_by_dimension(rows, "dInvoiceLine", d_invoice):
line = ET.SubElement(invoice, qname("cac:InvoiceLine"))
ET.SubElement(line, qname("cbc:ID")).text = line_row["lineID"]
ET.SubElement(
line,
qname("cbc:InvoicedQuantity"),
{"unitCode": line_row.get("unitCode") or "EA"},
).text = line_row["invoicedQuantity"]
ET.SubElement(
line,
qname("cbc:LineExtensionAmount"),
{"currencyID": currency},
).text = line_row["lineExtensionAmount"]
item = ET.SubElement(line, qname("cac:Item"))
ET.SubElement(item, qname("cbc:Name")).text = line_row["itemName"]
if line_row.get("lineTaxCategoryCode"):
tax_category = ET.SubElement(item, qname("cac:ClassifiedTaxCategory"))
ET.SubElement(tax_category, qname("cbc:ID")).text = line_row[
"lineTaxCategoryCode"
]
ET.SubElement(tax_category, qname("cbc:Percent")).text = line_row[
"lineTaxRate"
]
scheme = ET.SubElement(tax_category, qname("cac:TaxScheme"))
ET.SubElement(scheme, qname("cbc:ID")).text = "VAT"
price = ET.SubElement(line, qname("cac:Price"))
ET.SubElement(
price,
qname("cbc:PriceAmount"),
{"currencyID": currency},
).text = line_row["unitPriceAmount"]
構造化CSV側では,請求明細は1行で表されている。
1,,,,,1,,,,,,,,,,,,,,,,,,INV-26-861-1,商品A,S,10,10,EA,1000,10000
これが,XMLでは cac:InvoiceLine,cac:Item,cac:Price などの階層要素として展開される。
12. XMLからCSVを生成する
逆方向では,XPathに対応するXML要素を探し,その値をCSV列へ戻す。
def new_empty_row(fieldnames: list[str], d_invoice: str = "1") -> dict[str, str]:
row = {name: "" for name in fieldnames}
row["dInvoice"] = d_invoice
return row
def append_header_row(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
row = new_empty_row(fieldnames, d_invoice)
row["invoiceNumber"] = get_text(invoice, "/Invoice/cbc:ID")
row["issueDate"] = get_text(invoice, "/Invoice/cbc:IssueDate")
row["invoiceTypeCode"] = get_text(invoice, "/Invoice/cbc:InvoiceTypeCode")
row["currencyCode"] = get_text(invoice, "/Invoice/cbc:DocumentCurrencyCode")
rows.append(row)
def append_party_rows(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
party_specs = [
("1", "Seller", "/Invoice/cac:AccountingSupplierParty/cac:Party"),
("2", "Buyer", "/Invoice/cac:AccountingCustomerParty/cac:Party"),
]
for d_party, role, party_xpath in party_specs:
parts = [p for p in party_xpath.split("/") if p]
party = invoice
for part in parts[1:]:
party = party.find(qname(part))
if party is None:
break
if party is None:
continue
row = new_empty_row(fieldnames, d_invoice)
row["dInvoiceParty"] = d_party
row["partyRole"] = role
name = party.find(f"{qname('cac:PartyName')}/{qname('cbc:Name')}")
tax_id = party.find(f"{qname('cac:PartyTaxScheme')}/{qname('cbc:CompanyID')}")
row["partyName"] = name.text if name is not None and name.text else ""
row["partyTaxID"] = tax_id.text if tax_id is not None and tax_id.text else ""
rows.append(row)
def append_document_reference_rows(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
ref_specs = [
("Order", "/Invoice/cac:OrderReference/cbc:ID"),
("Delivery", "/Invoice/cac:DespatchDocumentReference/cbc:ID"),
]
ref_no = 1
for ref_type, ref_xpath in ref_specs:
ref_id = get_text(invoice, ref_xpath)
if ref_id:
row = new_empty_row(fieldnames, d_invoice)
row["dDocumentReference"] = str(ref_no)
row["referenceType"] = ref_type
row["referenceID"] = ref_id
rows.append(row)
ref_no += 1
def append_payment_row(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
due_date = get_text(invoice, "/Invoice/cbc:DueDate")
payment_id = get_text(invoice, "/Invoice/cac:PaymentMeans/cbc:PaymentID")
if due_date or payment_id:
row = new_empty_row(fieldnames, d_invoice)
row["dPayment"] = "1"
row["paymentDueDate"] = due_date
row["paymentReference"] = payment_id
rows.append(row)
def append_tax_breakdown_rows(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
tax_total = invoice.find(qname("cac:TaxTotal"))
if tax_total is None:
return
for i, subtotal in enumerate(tax_total.findall(qname("cac:TaxSubtotal")), start=1):
row = new_empty_row(fieldnames, d_invoice)
row["dTaxBreakdown"] = str(i)
taxable = subtotal.find(qname("cbc:TaxableAmount"))
tax_amount = subtotal.find(qname("cbc:TaxAmount"))
category = subtotal.find(qname("cac:TaxCategory"))
row["taxableAmount"] = taxable.text if taxable is not None and taxable.text else ""
row["taxAmount"] = tax_amount.text if tax_amount is not None and tax_amount.text else ""
if category is not None:
cat_id = category.find(qname("cbc:ID"))
percent = category.find(qname("cbc:Percent"))
row["taxCategoryCode"] = cat_id.text if cat_id is not None and cat_id.text else ""
row["taxRate"] = percent.text if percent is not None and percent.text else ""
rows.append(row)
def append_invoice_line_rows(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
for i, line in enumerate(invoice.findall(qname("cac:InvoiceLine")), start=1):
row = new_empty_row(fieldnames, d_invoice)
row["dInvoiceLine"] = str(i)
line_id = line.find(qname("cbc:ID"))
quantity = line.find(qname("cbc:InvoicedQuantity"))
line_amount = line.find(qname("cbc:LineExtensionAmount"))
item = line.find(qname("cac:Item"))
price = line.find(qname("cac:Price"))
row["lineID"] = line_id.text if line_id is not None and line_id.text else ""
if quantity is not None:
row["invoicedQuantity"] = quantity.text or ""
row["unitCode"] = quantity.attrib.get("unitCode", "")
row["lineExtensionAmount"] = line_amount.text if line_amount is not None and line_amount.text else ""
if item is not None:
name = item.find(qname("cbc:Name"))
row["itemName"] = name.text if name is not None and name.text else ""
tax_category = item.find(qname("cac:ClassifiedTaxCategory"))
if tax_category is not None:
cat_id = tax_category.find(qname("cbc:ID"))
percent = tax_category.find(qname("cbc:Percent"))
row["lineTaxCategoryCode"] = cat_id.text if cat_id is not None and cat_id.text else ""
row["lineTaxRate"] = percent.text if percent is not None and percent.text else ""
if price is not None:
price_amount = price.find(qname("cbc:PriceAmount"))
row["unitPriceAmount"] = price_amount.text if price_amount is not None and price_amount.text else ""
rows.append(row)
def ubl_xml_to_csv_rows(
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
append_header_row(rows, invoice, fieldnames, d_invoice)
append_party_rows(rows, invoice, fieldnames, d_invoice)
append_document_reference_rows(rows, invoice, fieldnames, d_invoice)
append_payment_row(rows, invoice, fieldnames, d_invoice)
append_tax_breakdown_rows(rows, invoice, fieldnames, d_invoice)
append_invoice_line_rows(rows, invoice, fieldnames, d_invoice)
return rows
この処理により,XML上のヘッダー,当事者,参照文書,支払情報,税率別内訳,請求明細が,構造化CSVの行として再構成される。
13. 出力処理とmain関数
最後に,XMLとCSVを書き出す。
def rows_to_csv(rows: list[dict[str, str]], fieldnames: list[str]) -> str:
out = io.StringIO()
writer = csv.DictWriter(out, fieldnames=fieldnames, lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
return out.getvalue()
def pretty_xml(elem: ET.Element) -> str:
ET.indent(elem, space=" ")
return ET.tostring(elem, encoding="unicode")
def write_xml(elem: ET.Element, path: str | Path) -> None:
tree = ET.ElementTree(elem)
ET.indent(tree, space=" ")
tree.write(path, encoding="utf-8", xml_declaration=True)
コマンドライン引数の処理は次のとおりである。
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Convert structured invoice CSV to a JP PINT UBL subset and back."
)
parser.add_argument("--input-csv", default="invoice.csv", help="structured invoice CSV")
parser.add_argument("--bindings-csv", default="bindings.csv", help="semantic_path,xpath binding CSV")
parser.add_argument("--xml-out", default="invoice.xml", help="output XML file")
parser.add_argument("--roundtrip-csv", default="roundtrip.csv", help="round-trip CSV output")
parser.add_argument("--d-invoice", default="1", help="target dInvoice value")
parser.add_argument("--print-xml", action="store_true", help="print generated XML to stdout")
parser.add_argument("--print-csv", action="store_true", help="print round-trip CSV to stdout")
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(sys.argv[1:] if argv is None else argv)
rows = read_core_csv_file(args.input_csv)
if not rows:
raise ValueError(f"No rows in input CSV: {args.input_csv}")
bindings = read_bindings_csv_file(args.bindings_csv)
fieldnames = list(rows[0].keys())
invoice_xml = csv_to_ubl_xml(rows, bindings, args.d_invoice)
write_xml(invoice_xml, args.xml_out)
roundtrip_rows = ubl_xml_to_csv_rows(invoice_xml, fieldnames, args.d_invoice)
Path(args.roundtrip_csv).write_text(
rows_to_csv(roundtrip_rows, fieldnames),
encoding="utf-8-sig",
)
if args.print_xml:
print("=== CSV -> JP PINT UBL subset ===")
print(pretty_xml(invoice_xml))
if args.print_csv:
print("=== JP PINT UBL subset -> structured CSV ===")
print(rows_to_csv(roundtrip_rows, fieldnames))
print(f"XML written: {args.xml_out}")
print(f"CSV written: {args.roundtrip_csv}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
14. 実行結果
実行すると,invoice.xml と roundtrip.csv が作成される。
python3 syntax_binding.py \
--input-csv invoice.csv \
--bindings-csv bindings.csv \
--xml-out invoice.xml \
--roundtrip-csv roundtrip.csv \
--print-xml
生成されるXMLの一部は次のようになる。
<?xml version='1.0' encoding='utf-8'?>
<Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2" xmlns:cac="urn:oasis:names:specification:ubl:schema:xsd:CommonAggregateComponents-2" xmlns:cbc="urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2">
<cbc:ID>INV-26-861</cbc:ID>
<cbc:IssueDate>2026-06-20</cbc:IssueDate>
<cbc:InvoiceTypeCode>380</cbc:InvoiceTypeCode>
<cbc:DocumentCurrencyCode>JPY</cbc:DocumentCurrencyCode>
<cac:AccountingSupplierParty>
<cac:Party>
<cac:PartyName>
<cbc:Name>売手株式会社</cbc:Name>
</cac:PartyName>
<cac:PartyTaxScheme>
<cbc:CompanyID>T1234567890123</cbc:CompanyID>
</cac:PartyTaxScheme>
</cac:Party>
</cac:AccountingSupplierParty>
<cac:AccountingCustomerParty>
<cac:Party>
<cac:PartyName>
<cbc:Name>買手株式会社</cbc:Name>
</cac:PartyName>
</cac:Party>
</cac:AccountingCustomerParty>
<cac:OrderReference>
<cbc:ID>PO-2026-0001</cbc:ID>
</cac:OrderReference>
<cac:DespatchDocumentReference>
<cbc:ID>DN-2026-0001</cbc:ID>
</cac:DespatchDocumentReference>
<cbc:DueDate>2026-07-31</cbc:DueDate>
<cac:PaymentMeans>
<cbc:PaymentID>INV-2026-0001</cbc:PaymentID>
</cac:PaymentMeans>
<cac:TaxTotal>
<cbc:TaxAmount currencyID="JPY">1472</cbc:TaxAmount>
XMLを読み戻して生成したCSVは次のようになる。
dInvoice,dInvoiceParty,dDocumentReference,dPayment,dTaxBreakdown,dInvoiceLine,invoiceNumber,issueDate,invoiceTypeCode,currencyCode,totalAmount,partyRole,partyName,partyTaxID,referenceType,referenceID,paymentDueDate,paymentMeansCode,paymentReference,taxCategoryCode,taxRate,taxableAmount,taxAmount,lineID,itemName,lineTaxCategoryCode,lineTaxRate,invoicedQuantity,unitCode,unitPriceAmount,lineExtensionAmount
1,,,,,,INV-26-861,2026-06-20,380,JPY,,,,,,,,,,,,,,,,,,,,,
1,1,,,,,,,,,,Seller,売手株式会社,T1234567890123,,,,,,,,,,,,,,,,,
1,2,,,,,,,,,,Buyer,買手株式会社,,,,,,,,,,,,,,,,,,
1,,1,,,,,,,,,,,,Order,PO-2026-0001,,,,,,,,,,,,,,,
1,,2,,,,,,,,,,,,Delivery,DN-2026-0001,,,,,,,,,,,,,,,
1,,,1,,,,,,,,,,,,,2026-07-31,,INV-2026-0001,,,,,,,,,,,,
1,,,,1,,,,,,,,,,,,,,,S,10,12320,1232,,,,,,,,
1,,,,2,,,,,,,,,,,,,,,AA,8,3010,240,,,,,,,,
1,,,,,1,,,,,,,,,,,,,,,,,,INV-26-861-1,商品A,S,10,10,EA,1000,10000
1,,,,,2,,,,,,,,,,,,,,,,,,INV-26-861-2,商品B,S,10,4,EA,580,2320
1,,,,,3,,,,,,,,,,,,,,,,,,INV-26-861-3,食品C,AA,8,7,EA,430,3010
15. プログラマ向け補足
15.1. なぜ構文結合表をCSVに出すのか
構文結合表をPythonコードに埋め込むと,対応先のXML構文が変わるたびにプログラムを修正する必要がある。
一方,bindings.csv として外部化すれば,プログラム本体は次の汎用処理に集中できる。
CSVを読み込む。 SemanticPathで値を取得する。 XPathでXML要素を作る。 値を設定する。
この方式では,別のXML構文又はJSON構文へ対応する場合にも,まず変更対象となるのは結合表である。
15.2. 限定されたSemanticPath subset
このサンプルでは,SemanticPathを意図的に限定している。
$.Invoice.field
$.Invoice.dimension[?@.filterField=filterValue].field
これは,構文結合表で実際に必要な指定だけを安全に扱うためである。汎用式をすべて許可すると,実装が複雑になり,検証も難しくなる。
15.3. 繰返し構造は別規則にしている
dTaxBreakdown や dInvoiceLine は,単一の値を単一のXPathへ写すだけでは表現できない。
そのため,本サンプルでは,単純項目は bindings.csv で処理し,繰返し構造は append_tax_total() と append_invoice_lines() で処理している。
実運用では,繰返し構造の生成規則も結合定義側へ外部化していくことが次の課題になる。
16. おわりに
構文結合表を用いると,変換プログラムの考え方が変わる。
従来は,変換元項目と変換先項目をプログラム中に直接書き込むことが多かった。
しかし,SemanticPath と XPath の対応を構文結合表として定義すれば,プログラム本体は,次の汎用処理に集中できる。
SemanticPathで値を取得する。 XPathでXML要素を作る。 値を設定する。 逆方向では,XPathで値を取得し,構造化CSV行へ戻す。
今回の改訂では,その第一歩として,請求書CSVと構文結合表を外部CSVとして読み込む形にした。
日本版コアインボイス(構造化CSV)は,JP PINTを置き換えるものではない。JP PINT UBL,既存EDI,ERP,会計システムの間に入り,それぞれの構文を共通の意味に結び付けるための中間的なセマンティックモデルである。
その実装イメージをつかむうえで,今回のような小さなPythonプログラムは有効である。
17. 参考:サンプルソース
# tag::imports[]
import argparse
import csv
import io
import re
import sys
import xml.etree.ElementTree as ET
from decimal import Decimal
from pathlib import Path
# end::imports[]
Binding = tuple[str, str]
# tag::namespace[]
UBL_NS = "urn:oasis:names:specification:ubl:schema:xsd:Invoice-2"
CBC_NS = "urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2"
CAC_NS = "urn:oasis:names:specification:ubl:schema:xsd:CommonAggregateComponents-2"
NS = {
"": UBL_NS,
"cbc": CBC_NS,
"cac": CAC_NS,
}
for prefix, uri in NS.items():
ET.register_namespace(prefix, uri)
def qname(name: str) -> str:
if ":" in name:
prefix, local = name.split(":", 1)
return f"{{{NS[prefix]}}}{local}"
return f"{{{UBL_NS}}}{name}"
# end::namespace[]
# tag::read_csv[]
def read_core_csv(text: str) -> list[dict[str, str]]:
"""CSV文字列を辞書行のリストとして読み込む。"""
return list(csv.DictReader(io.StringIO(text)))
def read_core_csv_file(path: str | Path, encoding: str = "utf-8-sig") -> list[dict[str, str]]:
"""構造化CSVファイルを読み込む。Excel由来のBOMを考慮し,既定をutf-8-sigにする。"""
return read_core_csv(Path(path).read_text(encoding=encoding))
# end::read_csv[]
# tag::binding_read[]
def read_bindings_csv(text: str) -> list[Binding]:
"""semantic_path,xpath の2列を持つ構文結合表CSVを読み込む。"""
bindings: list[Binding] = []
reader = csv.DictReader(io.StringIO(text))
if reader.fieldnames is None:
raise ValueError("Binding CSV has no header")
# UTF-8 BOM対策。
fieldnames = [name.lstrip("\ufeff") for name in reader.fieldnames]
semantic_col = "semantic_path"
xpath_col = "xpath"
if semantic_col not in fieldnames or xpath_col not in fieldnames:
raise ValueError("Binding CSV must have semantic_path and xpath columns")
for line_no, row in enumerate(reader, start=2):
normalized = {k.lstrip("\ufeff"): (v or "").strip() for k, v in row.items()}
semantic_path = normalized.get(semantic_col, "")
xpath = normalized.get(xpath_col, "")
if not semantic_path and not xpath:
continue
if not semantic_path or not xpath:
raise ValueError(f"Incomplete binding at line {line_no}: {row!r}")
if not xpath.startswith("/Invoice/") and xpath != "/Invoice":
raise ValueError(f"XPath must start with /Invoice at line {line_no}: {xpath!r}")
bindings.append((semantic_path, xpath))
return bindings
def read_bindings_csv_file(path: str | Path, encoding: str = "utf-8-sig") -> list[Binding]:
return read_bindings_csv(Path(path).read_text(encoding=encoding))
# end::binding_read[]
# tag::xml_path[]
def normalize_xpath(xpath: str) -> str:
xpath = xpath.strip()
# 結合表の正規表記では,途中空白を許容しない。
if re.search(r"\s", xpath):
raise ValueError(f"XPath contains whitespace: {xpath!r}")
return xpath
def ensure_path(root: ET.Element, xpath: str) -> ET.Element:
xpath = normalize_xpath(xpath)
parts = [p for p in xpath.split("/") if p]
if not parts or parts[0] != "Invoice":
raise ValueError(f"XPath must start with /Invoice: {xpath}")
elem = root
for part in parts[1:]:
tag = qname(part)
child = elem.find(tag)
if child is None:
child = ET.SubElement(elem, tag)
elem = child
return elem
def set_text(root: ET.Element, xpath: str, value: str | None) -> None:
if value not in (None, ""):
ensure_path(root, xpath).text = str(value)
def get_text(root: ET.Element, xpath: str) -> str:
xpath = normalize_xpath(xpath)
parts = [p for p in xpath.split("/") if p]
if not parts or parts[0] != "Invoice":
raise ValueError(f"XPath must start with /Invoice: {xpath}")
elem = root
for part in parts[1:]:
elem = elem.find(qname(part))
if elem is None:
return ""
return elem.text or ""
# end::xml_path[]
# tag::semantic_path[]
def rows_by_dimension(
rows: list[dict[str, str]],
dimension: str,
d_invoice: str = "1",
) -> list[dict[str, str]]:
return [r for r in rows if r.get("dInvoice") == d_invoice and r.get(dimension)]
def first_value(
rows: list[dict[str, str]],
column: str,
d_invoice: str = "1",
) -> str:
for row in rows:
if row.get("dInvoice") == d_invoice and row.get(column):
return row[column]
return ""
def query_semantic_path(
rows: list[dict[str, str]],
path: str,
d_invoice: str = "1",
) -> str:
"""
このサンプルで扱う SemanticPath subset:
$.Invoice.field
$.Invoice.dimension[?@.filterField="filterValue"].field
$.Invoice.dimension[?@.filterField=filterValue].field
"""
m = re.fullmatch(r"\$\.Invoice\.([A-Za-z_][A-Za-z0-9_]*)", path)
if m:
return first_value(rows, m.group(1), d_invoice)
m = re.fullmatch(
r"\$\.Invoice\.([A-Za-z_][A-Za-z0-9_]*)"
r"\[\?@\.([A-Za-z_][A-Za-z0-9_]*)=(?:\"([^\"]+)\"|([^\]]+))\]"
r"\.([A-Za-z_][A-Za-z0-9_]*)",
path,
)
if m:
dimension, filter_field, quoted_value, bare_value, target_field = m.groups()
filter_value = quoted_value if quoted_value is not None else bare_value
for row in rows_by_dimension(rows, dimension, d_invoice):
if row.get(filter_field) == filter_value:
return row.get(target_field, "")
return ""
raise ValueError(f"Unsupported SemanticPath: {path}")
# end::semantic_path[]
# tag::tax_total[]
def append_tax_total(
invoice: ET.Element,
rows: list[dict[str, str]],
currency: str,
d_invoice: str = "1",
) -> None:
tax_rows = rows_by_dimension(rows, "dTaxBreakdown", d_invoice)
if not tax_rows:
return
tax_total = ET.SubElement(invoice, qname("cac:TaxTotal"))
total_tax_amount = sum(Decimal(r.get("taxAmount") or "0") for r in tax_rows)
ET.SubElement(
tax_total,
qname("cbc:TaxAmount"),
{"currencyID": currency},
).text = str(total_tax_amount)
for tax_row in tax_rows:
subtotal = ET.SubElement(tax_total, qname("cac:TaxSubtotal"))
ET.SubElement(
subtotal,
qname("cbc:TaxableAmount"),
{"currencyID": currency},
).text = tax_row["taxableAmount"]
ET.SubElement(
subtotal,
qname("cbc:TaxAmount"),
{"currencyID": currency},
).text = tax_row["taxAmount"]
category = ET.SubElement(subtotal, qname("cac:TaxCategory"))
ET.SubElement(category, qname("cbc:ID")).text = tax_row["taxCategoryCode"]
ET.SubElement(category, qname("cbc:Percent")).text = tax_row["taxRate"]
scheme = ET.SubElement(category, qname("cac:TaxScheme"))
ET.SubElement(scheme, qname("cbc:ID")).text = "VAT"
# end::tax_total[]
# tag::invoice_lines[]
def append_invoice_lines(
invoice: ET.Element,
rows: list[dict[str, str]],
currency: str,
d_invoice: str = "1",
) -> None:
for line_row in rows_by_dimension(rows, "dInvoiceLine", d_invoice):
line = ET.SubElement(invoice, qname("cac:InvoiceLine"))
ET.SubElement(line, qname("cbc:ID")).text = line_row["lineID"]
ET.SubElement(
line,
qname("cbc:InvoicedQuantity"),
{"unitCode": line_row.get("unitCode") or "EA"},
).text = line_row["invoicedQuantity"]
ET.SubElement(
line,
qname("cbc:LineExtensionAmount"),
{"currencyID": currency},
).text = line_row["lineExtensionAmount"]
item = ET.SubElement(line, qname("cac:Item"))
ET.SubElement(item, qname("cbc:Name")).text = line_row["itemName"]
if line_row.get("lineTaxCategoryCode"):
tax_category = ET.SubElement(item, qname("cac:ClassifiedTaxCategory"))
ET.SubElement(tax_category, qname("cbc:ID")).text = line_row[
"lineTaxCategoryCode"
]
ET.SubElement(tax_category, qname("cbc:Percent")).text = line_row[
"lineTaxRate"
]
scheme = ET.SubElement(tax_category, qname("cac:TaxScheme"))
ET.SubElement(scheme, qname("cbc:ID")).text = "VAT"
price = ET.SubElement(line, qname("cac:Price"))
ET.SubElement(
price,
qname("cbc:PriceAmount"),
{"currencyID": currency},
).text = line_row["unitPriceAmount"]
# end::invoice_lines[]
# tag::csv_to_xml[]
def csv_to_ubl_xml(
rows: list[dict[str, str]],
bindings: list[Binding],
d_invoice: str = "1",
) -> ET.Element:
invoice = ET.Element(qname("Invoice"))
# 単一項目及び条件付き単一項目は,構文結合表に従って設定する。
for semantic_path, xpath in bindings:
value = query_semantic_path(rows, semantic_path, d_invoice)
set_text(invoice, xpath, value)
currency = query_semantic_path(rows, "$.Invoice.currencyCode", d_invoice)
# 繰返し構造は,行の集合をXMLの繰返し要素へ展開する。
append_tax_total(invoice, rows, currency, d_invoice)
append_invoice_lines(invoice, rows, currency, d_invoice)
return invoice
# end::csv_to_xml[]
# tag::xml_to_csv[]
def new_empty_row(fieldnames: list[str], d_invoice: str = "1") -> dict[str, str]:
row = {name: "" for name in fieldnames}
row["dInvoice"] = d_invoice
return row
def append_header_row(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
row = new_empty_row(fieldnames, d_invoice)
row["invoiceNumber"] = get_text(invoice, "/Invoice/cbc:ID")
row["issueDate"] = get_text(invoice, "/Invoice/cbc:IssueDate")
row["invoiceTypeCode"] = get_text(invoice, "/Invoice/cbc:InvoiceTypeCode")
row["currencyCode"] = get_text(invoice, "/Invoice/cbc:DocumentCurrencyCode")
rows.append(row)
def append_party_rows(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
party_specs = [
("1", "Seller", "/Invoice/cac:AccountingSupplierParty/cac:Party"),
("2", "Buyer", "/Invoice/cac:AccountingCustomerParty/cac:Party"),
]
for d_party, role, party_xpath in party_specs:
parts = [p for p in party_xpath.split("/") if p]
party = invoice
for part in parts[1:]:
party = party.find(qname(part))
if party is None:
break
if party is None:
continue
row = new_empty_row(fieldnames, d_invoice)
row["dInvoiceParty"] = d_party
row["partyRole"] = role
name = party.find(f"{qname('cac:PartyName')}/{qname('cbc:Name')}")
tax_id = party.find(f"{qname('cac:PartyTaxScheme')}/{qname('cbc:CompanyID')}")
row["partyName"] = name.text if name is not None and name.text else ""
row["partyTaxID"] = tax_id.text if tax_id is not None and tax_id.text else ""
rows.append(row)
def append_document_reference_rows(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
ref_specs = [
("Order", "/Invoice/cac:OrderReference/cbc:ID"),
("Delivery", "/Invoice/cac:DespatchDocumentReference/cbc:ID"),
]
ref_no = 1
for ref_type, ref_xpath in ref_specs:
ref_id = get_text(invoice, ref_xpath)
if ref_id:
row = new_empty_row(fieldnames, d_invoice)
row["dDocumentReference"] = str(ref_no)
row["referenceType"] = ref_type
row["referenceID"] = ref_id
rows.append(row)
ref_no += 1
def append_payment_row(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
due_date = get_text(invoice, "/Invoice/cbc:DueDate")
payment_id = get_text(invoice, "/Invoice/cac:PaymentMeans/cbc:PaymentID")
if due_date or payment_id:
row = new_empty_row(fieldnames, d_invoice)
row["dPayment"] = "1"
row["paymentDueDate"] = due_date
row["paymentReference"] = payment_id
rows.append(row)
def append_tax_breakdown_rows(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
tax_total = invoice.find(qname("cac:TaxTotal"))
if tax_total is None:
return
for i, subtotal in enumerate(tax_total.findall(qname("cac:TaxSubtotal")), start=1):
row = new_empty_row(fieldnames, d_invoice)
row["dTaxBreakdown"] = str(i)
taxable = subtotal.find(qname("cbc:TaxableAmount"))
tax_amount = subtotal.find(qname("cbc:TaxAmount"))
category = subtotal.find(qname("cac:TaxCategory"))
row["taxableAmount"] = taxable.text if taxable is not None and taxable.text else ""
row["taxAmount"] = tax_amount.text if tax_amount is not None and tax_amount.text else ""
if category is not None:
cat_id = category.find(qname("cbc:ID"))
percent = category.find(qname("cbc:Percent"))
row["taxCategoryCode"] = cat_id.text if cat_id is not None and cat_id.text else ""
row["taxRate"] = percent.text if percent is not None and percent.text else ""
rows.append(row)
def append_invoice_line_rows(
rows: list[dict[str, str]],
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> None:
for i, line in enumerate(invoice.findall(qname("cac:InvoiceLine")), start=1):
row = new_empty_row(fieldnames, d_invoice)
row["dInvoiceLine"] = str(i)
line_id = line.find(qname("cbc:ID"))
quantity = line.find(qname("cbc:InvoicedQuantity"))
line_amount = line.find(qname("cbc:LineExtensionAmount"))
item = line.find(qname("cac:Item"))
price = line.find(qname("cac:Price"))
row["lineID"] = line_id.text if line_id is not None and line_id.text else ""
if quantity is not None:
row["invoicedQuantity"] = quantity.text or ""
row["unitCode"] = quantity.attrib.get("unitCode", "")
row["lineExtensionAmount"] = line_amount.text if line_amount is not None and line_amount.text else ""
if item is not None:
name = item.find(qname("cbc:Name"))
row["itemName"] = name.text if name is not None and name.text else ""
tax_category = item.find(qname("cac:ClassifiedTaxCategory"))
if tax_category is not None:
cat_id = tax_category.find(qname("cbc:ID"))
percent = tax_category.find(qname("cbc:Percent"))
row["lineTaxCategoryCode"] = cat_id.text if cat_id is not None and cat_id.text else ""
row["lineTaxRate"] = percent.text if percent is not None and percent.text else ""
if price is not None:
price_amount = price.find(qname("cbc:PriceAmount"))
row["unitPriceAmount"] = price_amount.text if price_amount is not None and price_amount.text else ""
rows.append(row)
def ubl_xml_to_csv_rows(
invoice: ET.Element,
fieldnames: list[str],
d_invoice: str = "1",
) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
append_header_row(rows, invoice, fieldnames, d_invoice)
append_party_rows(rows, invoice, fieldnames, d_invoice)
append_document_reference_rows(rows, invoice, fieldnames, d_invoice)
append_payment_row(rows, invoice, fieldnames, d_invoice)
append_tax_breakdown_rows(rows, invoice, fieldnames, d_invoice)
append_invoice_line_rows(rows, invoice, fieldnames, d_invoice)
return rows
# end::xml_to_csv[]
# tag::output[]
def rows_to_csv(rows: list[dict[str, str]], fieldnames: list[str]) -> str:
out = io.StringIO()
writer = csv.DictWriter(out, fieldnames=fieldnames, lineterminator="\n")
writer.writeheader()
writer.writerows(rows)
return out.getvalue()
def pretty_xml(elem: ET.Element) -> str:
ET.indent(elem, space=" ")
return ET.tostring(elem, encoding="unicode")
def write_xml(elem: ET.Element, path: str | Path) -> None:
tree = ET.ElementTree(elem)
ET.indent(tree, space=" ")
tree.write(path, encoding="utf-8", xml_declaration=True)
# end::output[]
# tag::main[]
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Convert structured invoice CSV to a JP PINT UBL subset and back."
)
parser.add_argument("--input-csv", default="invoice.csv", help="structured invoice CSV")
parser.add_argument("--bindings-csv", default="bindings.csv", help="semantic_path,xpath binding CSV")
parser.add_argument("--xml-out", default="invoice.xml", help="output XML file")
parser.add_argument("--roundtrip-csv", default="roundtrip.csv", help="round-trip CSV output")
parser.add_argument("--d-invoice", default="1", help="target dInvoice value")
parser.add_argument("--print-xml", action="store_true", help="print generated XML to stdout")
parser.add_argument("--print-csv", action="store_true", help="print round-trip CSV to stdout")
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(sys.argv[1:] if argv is None else argv)
rows = read_core_csv_file(args.input_csv)
if not rows:
raise ValueError(f"No rows in input CSV: {args.input_csv}")
bindings = read_bindings_csv_file(args.bindings_csv)
fieldnames = list(rows[0].keys())
invoice_xml = csv_to_ubl_xml(rows, bindings, args.d_invoice)
write_xml(invoice_xml, args.xml_out)
roundtrip_rows = ubl_xml_to_csv_rows(invoice_xml, fieldnames, args.d_invoice)
Path(args.roundtrip_csv).write_text(
rows_to_csv(roundtrip_rows, fieldnames),
encoding="utf-8-sig",
)
if args.print_xml:
print("=== CSV -> JP PINT UBL subset ===")
print(pretty_xml(invoice_xml))
if args.print_csv:
print("=== JP PINT UBL subset -> structured CSV ===")
print(rows_to_csv(roundtrip_rows, fieldnames))
print(f"XML written: {args.xml_out}")
print(f"CSV written: {args.roundtrip_csv}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
# end::main[]


コメントを残す