構文結合表CSVでつくる構造化CSVとJP PINT UBLの相互変換

Views: 1

1. はじめに

前回の記事では,日本版コアインボイスを構造化CSVとして表現し,JP PINT UBLとの対応を SemanticPath と XPath の構文結合表として整理した。

本稿では,その構文結合表を使って,構造化CSVとJP PINT UBL XMLを相互変換するPythonプログラムの考え方を紹介する。

今回の改訂では,説明用のPythonコードに埋め込んでいた請求書CSV及び構文結合表を,外部CSVファイルとして読み込む形に変更した。

ここで示すプログラムは,JP PINT UBL の完全な実装ではありません。今回同梱している bindings.csv も,代表的な定義項目の抜粋であり,JP PINT が必須としている項目をすべて含んでいるわけではありません。目的は,次の点をイメージとしてつかむことです。

  • CSV側の項目を SemanticPath で指定する。
  • XML側の項目を XPath で指定する。
  • 構文結合表を bindings.csv として外部化する。
  • プログラム本体は,構文結合表を読み取り,CSVからXMLを生成する。
  • 逆方向では,XMLからXPathで値を取得し,構造化CSVの行へ展開する。
  • 対応先の構文が変わっても,原則として結合定義CSVを変更すれば,同じ汎用プログラムを利用できる。

つまり,変換ロジックをプログラム内に直接書き込むのではなく,構文結合表として外部化することが重要である。

2. ファイル構成

今回のサンプルは,次の3ファイルを使う。

ダウンロード:syntax_binding_revised_package

ファイル 役割

invoice.csv

日本版コアインボイスを表す構造化CSV。

bindings.csv

構造化CSV側の SemanticPath と,JP PINT UBL側の XPath を対応付ける構文結合表。

syntax_binding.py

構造化CSVと構文結合表CSVを読み込み,JP PINT UBL XMLを生成し,さらにXMLから構造化CSVへ戻すサンプルプログラム。

PowerShell での実行例は次のとおりです。

py .\syntax_binding.py .\invoice.csv .\bindings.csv `
  --xml-out .\invoice.xml `
  --roundtrip-out .\roundtrip.csv

py が使えない環境では,python に置き換えてもかまいません。

この実行により,次のファイルが生成される。

出力ファイル 内容

invoice.xml

構造化CSVから生成したJP PINT UBL subset XML。

roundtrip.csv

生成したXMLを読み戻して再構成した構造化CSV。

3. 構造化CSVと構文結合表

3.1. 構造化CSVの例

本稿では,請求書構造化CSVを例にする。

dInvoice,dInvoiceParty,dDocumentReference,dPayment,dTaxBreakdown,dInvoiceLine,invoiceNumber,issueDate,invoiceTypeCode,currencyCode,totalAmount,totalTaxAmount,partyRole,partyName,partyTaxID,referenceType,referenceID,paymentDueDate,paymentMeansCode,paymentReference,taxCategoryCode,taxRate,taxableAmount,taxAmount,lineID,itemName,lineTaxCategoryCode,lineTaxRate,invoicedQuantity,unitCode,unitPriceAmount,lineExtensionAmount
1,,,,,,INV-26-861,2026-06-20,380,JPY,16802,1472,,,,,,,,,,,,,,,,,,,,
1,1,,,,,,,,,,,Seller,売手株式会社,T1234567890123,,,,,,,,,,,,,,,,,
1,2,,,,,,,,,,,Buyer,買手株式会社,,,,,,,,,,,,,,,,,,
1,,1,,,,,,,,,,,,,Order,PO-2026-0001,,,,,,,,,,,,,,,
1,,2,,,,,,,,,,,,,Delivery,DN-2026-0001,,,,,,,,,,,,,,,
1,,,1,,,,,,,,,,,,,,2026-07-31,30,INV-2026-0001,,,,,,,,,,,,
1,,,,1,,,,,,,,,,,,,,,,S,10,12320,1232,,,,,,,,
1,,,,1,,,,,,,,,,,,,,,,AA,8,3010,240,,,,,,,,
1,,,,,1,,,,,,,,,,,,,,,,,,,INV-26-861-1,商品A,S,10,10,EA,1000,10000
1,,,,,2,,,,,,,,,,,,,,,,,,,INV-26-861-2,商品B,S,10,4,EA,580,2320
1,,,,,3,,,,,,,,,,,,,,,,,,,INV-26-861-3,食品C,AA,8,7,EA,430,3010

このCSVでは,請求書ヘッダー,売手,買手,参照文書,支払情報,税率別内訳,請求明細を,1枚のCSVに格納している。

重要なのは,dInvoicePartydDocumentReferencedPaymentdTaxBreakdowndInvoiceLine などの列である。これらは,行がどの階層又は繰返し構造に属するかを示すディメンション列である。

例えば,次の行は売手を表す。

1,1,,,,,,,,,,,Seller,売手株式会社,T1234567890123,,,,,,,,,,,,,,,,,

この行では,dInvoiceParty1 が入り,partyRoleSeller が入っている。したがって,この行は「請求書当事者のうち,売手を表す行」と解釈できる。

3.2. 構文結合表CSV

構文結合表は,構造化CSV側の SemanticPath と,JP PINT UBL側の XPath を対応付ける表である。

今回の改訂では,この対応表をPythonコード中の BINDINGS リストではなく,次の bindings.csv として外部化した。

semantic_path,xpath
$.Invoice.invoiceNumber,/Invoice/cbc:ID
$.Invoice.issueDate,/Invoice/cbc:IssueDate
$.Invoice.invoiceTypeCode,/Invoice/cbc:InvoiceTypeCode
$.Invoice.currencyCode,/Invoice/cbc:DocumentCurrencyCode
$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyName,/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyName/cbc:Name
$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyTaxID,/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyTaxScheme/cbc:CompanyID
$.Invoice.dInvoiceParty[?@.partyRole=Buyer].partyName,/Invoice/cac:AccountingCustomerParty/cac:Party/cac:PartyName/cbc:Name
$.Invoice.dInvoiceParty[?@.partyRole=Buyer].partyTaxID,/Invoice/cac:AccountingCustomerParty/cac:Party/cac:PartyTaxScheme/cbc:CompanyID
$.Invoice.dDocumentReference[?@.referenceType=Order].referenceID,/Invoice/cac:OrderReference/cbc:ID
$.Invoice.dDocumentReference[?@.referenceType=Delivery].referenceID,/Invoice/cac:DespatchDocumentReference/cbc:ID
$.Invoice.paymentDueDate,/Invoice/cbc:DueDate
$.Invoice.paymentReference,/Invoice/cac:PaymentMeans/cbc:PaymentID
$.Invoice.totalTaxAmount,/Invoice/cac:TaxTotal/cbc:TaxAmount
$.Invoice.currencyCode,/Invoice/cac:TaxTotal/cbc:TaxAmount/@currencyID
$.Invoice.dTaxBreakdown.taxableAmount,/Invoice/cac:TaxTotal/cac:TaxSubtotal/cbc:TaxableAmount
$.Invoice.currencyCode,/Invoice/cac:TaxTotal/cac:TaxSubtotal/cbc:TaxableAmount/@currencyID
$.Invoice.dTaxBreakdown.taxAmount,/Invoice/cac:TaxTotal/cac:TaxSubtotal/cbc:TaxAmount
$.Invoice.currencyCode,/Invoice/cac:TaxTotal/cac:TaxSubtotal/cbc:TaxAmount/@currencyID
$.Invoice.dTaxBreakdown.taxCategoryCode,/Invoice/cac:TaxTotal/cac:TaxSubtotal/cac:TaxCategory/cbc:ID
$.Invoice.dTaxBreakdown.taxRate,/Invoice/cac:TaxTotal/cac:TaxSubtotal/cac:TaxCategory/cbc:Percent
const:VAT,/Invoice/cac:TaxTotal/cac:TaxSubtotal/cac:TaxCategory/cac:TaxScheme/cbc:ID

例えば,売手名称は次のように指定する。

$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyName

これは,次の意味である。

dInvoiceParty の行のうち,
partyRole が Seller である行を選択し,
その行の partyName を取得する。

その値を,次のXPathで指定されるXML要素へ設定する。

/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyName/cbc:Name

プログラムは,CSV列名とXML要素名の対応を直接知っているのではない。対応関係を bindings.csv から読み込み,SemanticPath と XPath を使って処理している。

3.3. bindings.csv の定義方法

bindings.csv は,単に「CSV の列名」と「XML の要素名」を対応付けるだけの表ではありません。構造化CSV側の論理定義と,XML側の構文定義を対応付ける表です。今回のサンプルでは,主に次の4種類の記法を使っています。

記法 意味

$.Invoice.invoiceNumber

請求書全体に1回だけ現れる単一項目です。

$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyName

dInvoiceParty の行のうち,partyRole=Seller の行を選び,その partyName を使う条件付き定義です。

$.Invoice.dTaxBreakdown.taxAmount

dTaxBreakdown に値がある各行を,税率別内訳の繰返し階層として扱い,その行の taxAmount を使う定義です。

const:VAT

CSV から値を読むのではなく,固定値 VAT を XML に書き込む定義です。

固定値の指定は,実務でも有効な考え方です。たとえば,cac:TaxScheme/cbc:ID のように,対象仕様上は定数として扱える値を毎回 CSV に持たせる必要がない場合,const: を使うと構文結合表が簡潔になります。

3.4. dXXX と cac:XXX / cbc:ZZZ の対応

構造化CSVでは,dXXX 列に値があることが,その行がどの繰返し階層に属するかを示します。そして,その行で値を持つ各欄のデータが,その階層で定義された項目です。これが,階層構造を1枚のCSVで表す構造化CSVの基本的な考え方です。

XML 側では,CCTS の Aggregate Business Information Entity(ABIE)に対応する集約的な業務情報要素が cac:XXX で表され,Basic Business Information Entity(BBIE)に対応する基本要素が cbc:ZZZ で表されます。構造化CSVでは,この ABIE に対応する論理階層を dYYY ディメンションで表し,その階層に属する BBIE 相当の項目値を同じ行の列として記述します。

たとえば今回のサンプルでは,次のような対応関係を想定しています。

XML 側の集約要素 構造化CSV側のディメンション

cac:AccountingSupplierParty / cac:AccountingCustomerParty

dInvoiceParty

cac:OrderReference / cac:DespatchDocumentReference

dDocumentReference

cac:PaymentMeans

dPayment

cac:TaxSubtotal

dTaxBreakdown

cac:InvoiceLine

dInvoiceLine

このように見ると,XML 側の cac:XXX は,構造化CSVでは dYYY によって表される論理階層の構文表現だと理解できます。

4. Pythonプログラムの全体構成

サンプルプログラムは,大きく次の処理に分けられる。

処理 役割

CSV読込み

invoice.csvcsv.DictReader で行リストとして読み込む。

構文結合表読込み

bindings.csv から SemanticPath と XPath の対応を読み込む。

SemanticPath評価

CSV行リストから,指定された SemanticPath に対応する値を取得する。

XML生成

XPathに従ってXML要素を作成し,取得した値を設定する。

XML読込み

ElementTreeでXMLを読み込み,XPath相当のパスで値を取得する。

CSV生成

XMLから取得した値を,構造化CSVの行として再構成する。

本稿では,Python標準ライブラリの csvxml.etree.ElementTree を使用する。

4.1. CSVファイルを読み込む

構造化CSVは,Pythonの標準ライブラリ csv で読み込む。

from __future__ import annotations

import argparse
import csv
import io
import re
import xml.etree.ElementTree as ET
from pathlib import Path

def read_core_csv(text: str) -> list[dict[str, str]]:
    """構造化CSV文字列を、1行1辞書のリストへ変換します。"""
    return list(csv.DictReader(io.StringIO(text)))


def read_core_csv_file(path: str | Path, encoding: str = "utf-8-sig") -> list[dict[str, str]]:
    """構造化CSVファイルを読み込みます。Excel 保存時の BOM にも対応するため utf-8-sig を使います。"""
    return read_core_csv(Path(path).read_text(encoding=encoding))

read_core_csv_file() は,invoice.csv を読み込み,1行を1つの辞書として扱えるようにする。

[
    {
        "dInvoice": "1",
        "invoiceNumber": "INV-26-861",
        "issueDate": "2026-06-20",
    },
    {
        "dInvoice": "1",
        "dInvoiceParty": "1",
        "partyRole": "Seller",
        "partyName": "売手株式会社",
        "partyTaxID": "T1234567890123",
    },
]

1行が1つの辞書になり,CSVの列名をキーとして値を取得できる。

4.2. 構文結合表CSVを読み込む

bindings.csv は,semantic_pathxpath の2列を持つCSVである。

def read_bindings_csv(text: str) -> list[Binding]:
    """bindings.csv を読み込み、(semantic_path, xpath) のリストへ変換します。"""
    bindings: list[Binding] = []
    reader = csv.DictReader(io.StringIO(text))
    if reader.fieldnames is None:
        raise ValueError("Binding CSV has no header")

    fieldnames = [name.lstrip("\ufeff") for name in reader.fieldnames]
    if "semantic_path" not in fieldnames or "xpath" not in fieldnames:
        raise ValueError("Binding CSV must have semantic_path and xpath columns")

    for line_no, row in enumerate(reader, start=2):
        normalized = {k.lstrip("\ufeff"): (v or "").strip() for k, v in row.items()}
        semantic_path = normalized.get("semantic_path", "")
        xpath = normalized.get("xpath", "")
        if not semantic_path and not xpath:
            continue
        if not semantic_path or not xpath:
            raise ValueError(f"Incomplete binding at line {line_no}: {row!r}")
        if not xpath.startswith("/Invoice/") and xpath != "/Invoice":
            raise ValueError(f"XPath must start with /Invoice at line {line_no}: {xpath!r}")
        bindings.append((semantic_path, xpath))
    return bindings


def read_bindings_csv_file(path: str | Path, encoding: str = "utf-8-sig") -> list[Binding]:
    return read_bindings_csv(Path(path).read_text(encoding=encoding))

この関数により,構文結合表は次のようなタプルのリストになる。

[
    ("$.Invoice.invoiceNumber", "/Invoice/cbc:ID"),
    ("$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyName",
     "/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyName/cbc:Name"),
]

ここで重要なのは,対応関係がプログラム定数ではなく,外部ファイルとして差し替え可能になっていることである。

4.3. SemanticPathで値を取得する

本稿のサンプルでは,次の2種類のSemanticPathだけを扱う。

$.Invoice.field
$.Invoice.dimension[?@.filterField=filterValue].field

例えば,請求書番号は次の形式である。

$.Invoice.invoiceNumber

売手名称は次の形式である。

$.Invoice.dInvoiceParty[?@.partyRole=Seller].partyName

これを処理する関数は,次のとおりである。

def rows_by_dimension(rows: list[dict[str, str]], dimension: str, d_invoice: str = "1") -> list[dict[str, str]]:
    """dXXX に値がある行だけを集め、そのディメンション階層の行集合を返します。"""
    return [r for r in rows if r.get("dInvoice") == d_invoice and r.get(dimension)]


def first_value(rows: list[dict[str, str]], column: str, d_invoice: str = "1") -> str:
    for row in rows:
        if row.get("dInvoice") == d_invoice and row.get(column):
            return row[column]
    return ""


def direct_dimension_target(path: str) -> tuple[str, str] | None:
    """$.Invoice.dTaxBreakdown.taxAmount のような、dXXX 直下の項目定義を判定します。"""
    m = re.fullmatch(r"\$\.Invoice\.(d[A-Za-z_][A-Za-z0-9_]*)\.([A-Za-z_][A-Za-z0-9_]*)", path)
    if not m:
        return None
    return m.group(1), m.group(2)


def query_semantic_path(
    rows: list[dict[str, str]],
    path: str,
    d_invoice: str = "1",
    current_row: dict[str, str] | None = None,
) -> str:
    """
    扱う SemanticPath subset:
      const:VALUE
      $.Invoice.field
      $.Invoice.dDimension.field
      $.Invoice.dimension[?@.filterField="filterValue"].field
      $.Invoice.dimension[?@.filterField=filterValue].field
    """
    if path.startswith("const:"):
        return path.split(":", 1)[1]

    m = re.fullmatch(r"\$\.Invoice\.([A-Za-z_][A-Za-z0-9_]*)", path)
    if m:
        return first_value(rows, m.group(1), d_invoice)

    dim_target = direct_dimension_target(path)
    if dim_target:
        dimension, target_field = dim_target
        if current_row is not None and current_row.get(dimension):
            return current_row.get(target_field, "")
        repeated_rows = rows_by_dimension(rows, dimension, d_invoice)
        if repeated_rows:
            return repeated_rows[0].get(target_field, "")
        return ""

    m = re.fullmatch(
        r"\$\.Invoice\.([A-Za-z_][A-Za-z0-9_]*)"
        r"\[\?@\.([A-Za-z_][A-Za-z0-9_]*)=(?:\"([^\"]+)\"|([^\]]+))\]"
        r"\.([A-Za-z_][A-Za-z0-9_]*)",
        path,
    )
    if m:
        dimension, filter_field, quoted_value, bare_value, target_field = m.groups()
        filter_value = quoted_value if quoted_value is not None else bare_value
        for row in rows_by_dimension(rows, dimension, d_invoice):
            if row.get(filter_field) == filter_value:
                return row.get(target_field, "")
        return ""

    raise ValueError(f"Unsupported SemanticPath: {path}")

この関数は,汎用XPathや汎用JSONPathを実装しているわけではない。構文結合表で必要となる最小限の SemanticPath subset を実装している。

実務上も,最初からすべての式を許容するより,構文結合表で使用できる記法を限定した方が安全である。

4.4. ElementTreeでXMLを扱う

JP PINT UBLはXML名前空間を使用する。ElementTreeで名前空間付きXMLを扱うため,まず名前空間を定義する。

UBL_NS = "urn:oasis:names:specification:ubl:schema:xsd:Invoice-2"
CBC_NS = "urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2"
CAC_NS = "urn:oasis:names:specification:ubl:schema:xsd:CommonAggregateComponents-2"

NS = {
    "": UBL_NS,
    "cbc": CBC_NS,
    "cac": CAC_NS,
}

# XML 出力時に見やすいプレフィックス付きで名前空間を登録する
for prefix, uri in NS.items():
    ET.register_namespace(prefix, uri)


def qname(name: str) -> str:
    """cbc:ID のような名前を ElementTree 用の QName へ変換します。"""
    if ":" in name:
        prefix, local = name.split(":", 1)
        return f"{{{NS[prefix]}}}{local}"
    return f"{{{UBL_NS}}}{name}"

ElementTreeでは,名前空間付き要素を次の形式で指定する。

{名前空間URI}ローカル名

例えば,cbc:ID は内部的には次の名前になる。

{urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2}ID

毎回この文字列を書くのは煩雑なので,接頭辞付きの名前からElementTree用の名前へ変換する関数 qname() を用意している。

4.5. XPathに沿って要素を生成する

構文結合表には,次のようなXPathが定義されている。

/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyName/cbc:Name

このパスに沿って,Invoiceに追加すべき要素を順番に作成する。

def normalize_xpath(xpath: str) -> str:
    """XPath 文字列の前後空白を除き、途中に空白が入っていないことを確認します。"""
    xpath = xpath.strip()
    if re.search(r"\s", xpath):
        raise ValueError(f"XPath contains whitespace: {xpath!r}")
    return xpath


def split_attr_path(xpath: str) -> tuple[str, str | None]:
    xpath = normalize_xpath(xpath)
    if "/@" in xpath:
        elem_path, attr_name = xpath.rsplit("/@", 1)
        return elem_path, attr_name
    return xpath, None


def path_parts(xpath: str) -> list[str]:
    elem_path, _ = split_attr_path(xpath)
    return [p for p in elem_path.split("/") if p]


def ensure_path(root: ET.Element, xpath: str) -> ET.Element:
    elem_path, _ = split_attr_path(xpath)
    parts = [p for p in elem_path.split("/") if p]
    if not parts or parts[0] != "Invoice":
        raise ValueError(f"XPath must start with /Invoice: {xpath}")

    elem = root
    for part in parts[1:]:
        tag = qname(part)
        child = elem.find(tag)
        if child is None:
            child = ET.SubElement(elem, tag)
        elem = child
    return elem


def ensure_relative_path(root: ET.Element, rel_xpath: str) -> ET.Element:
    elem_path, _ = split_attr_path(rel_xpath)
    parts = [p for p in elem_path.split("/") if p]
    elem = root
    for part in parts:
        tag = qname(part)
        child = elem.find(tag)
        if child is None:
            child = ET.SubElement(elem, tag)
        elem = child
    return elem


def set_text(root: ET.Element, xpath: str, value: str | None) -> None:
    if value in (None, ""):
        return
    elem_path, attr_name = split_attr_path(xpath)
    elem = ensure_path(root, elem_path)
    if attr_name:
        elem.set(attr_name, str(value))
    else:
        elem.text = str(value)


def set_text_relative(root: ET.Element, rel_xpath: str, value: str | None) -> None:
    if value in (None, ""):
        return
    elem_path, attr_name = split_attr_path(rel_xpath)
    elem = ensure_relative_path(root, elem_path)
    if attr_name:
        elem.set(attr_name, str(value))
    else:
        elem.text = str(value)


def get_text(root: ET.Element, xpath: str) -> str:
    elem_path, attr_name = split_attr_path(xpath)
    parts = [p for p in elem_path.split("/") if p]
    if not parts or parts[0] != "Invoice":
        raise ValueError(f"XPath must start with /Invoice: {xpath}")
    elem = root
    for part in parts[1:]:
        elem = elem.find(qname(part))
        if elem is None:
            return ""
    if attr_name:
        return elem.attrib.get(attr_name, "")
    return elem.text or ""


def get_text_relative(root: ET.Element, rel_xpath: str) -> str:
    elem_path, attr_name = split_attr_path(rel_xpath)
    parts = [p for p in elem_path.split("/") if p]
    elem = root
    for part in parts:
        elem = elem.find(qname(part))
        if elem is None:
            return ""
    if attr_name:
        return elem.attrib.get(attr_name, "")
    return elem.text or ""

ensure_path() は,指定されたXPathの最後の要素を返す。

例えば,次の呼び出しを行う。

elem = ensure_path(
    invoice,
    "/Invoice/cac:AccountingSupplierParty/cac:Party/cac:PartyName/cbc:Name"
)
elem.text = "売手株式会社"

これにより,次のようなXML要素が生成される。

<cac:AccountingSupplierParty>
  <cac:Party>
    <cac:PartyName>
      <cbc:Name>売手株式会社</cbc:Name>
    </cac:PartyName>
  </cac:Party>
</cac:AccountingSupplierParty>

4.6. 構文結合表を使ってCSVからXMLを作る

CSVからXMLへの変換は,単純項目については次の3段階で処理できる。

1. SemanticPathでCSVから値を取得する。
2. XPathでXML上の設定先を決める。
3. 取得した値をXML要素に設定する。
def csv_to_ubl_xml(rows: list[dict[str, str]], bindings: list[Binding], d_invoice: str = "1") -> ET.Element:
    """構造化CSVを XML へ変換します。単一項目は binding で、明細行は補助処理で出力します。"""
    invoice = ET.Element(qname("Invoice"))
    # binding 定義を見て、dXXX に対応する XML の反復階層を特定する
    repeat_defs = infer_repeat_definitions(bindings)

    for semantic_path, xpath in scalar_bindings(bindings, repeat_defs):
        value = query_semantic_path(rows, semantic_path, d_invoice)
        set_text(invoice, xpath, value)

    for dimension, repeat_path in repeat_defs.items():
        apply_repeating_section(invoice, rows, bindings, dimension, repeat_path, d_invoice)

    # 明細行はこのサンプルでは補助処理で出力している
    append_invoice_lines(invoice, rows, first_value(rows, "currencyCode", d_invoice), d_invoice)
    return invoice

この処理では,プログラム本体は「売手はこのXML要素に入れる」「買手はこのXML要素に入れる」といった業務固有の対応を知らない。

対応関係は bindings.csv に定義されている。したがって,別のXML構文へ変換したい場合でも,基本的には bindings.csv を差し替えればよい。

4.7. 繰返し構造の扱い

請求書番号や発行日は,1つの請求書に1回だけ現れる項目である。

一方,税率別内訳や請求明細は繰返し構造である。

構造 CSV側 XML側

請求書ヘッダー

invoiceNumber

cbc:ID

当事者

dInvoiceParty

cac:AccountingSupplierParty

税率別内訳

dTaxBreakdown

cac:TaxSubtotal

請求明細

dInvoiceLine

cac:InvoiceLine

このため,サンプルプログラムでは,単純項目は bindings.csv で処理し,dTaxBreakdowndInvoiceLine は繰返し生成規則として処理している。

4.8. 税率別内訳をXMLへ変換する

構造化CSVでは,税率別内訳は dTaxBreakdown に値がある行として表される。

def apply_repeating_section(
    invoice: ET.Element,
    rows: list[dict[str, str]],
    bindings: list[Binding],
    dimension: str,
    repeat_path: str,
    d_invoice: str = "1",
) -> None:
    # 同じ繰返し階層に属する binding 群を集める
    section_bindings = bindings_for_repeat_path(bindings, repeat_path)
    if not section_bindings:
        return

    repeat_rows = rows_by_dimension(rows, dimension, d_invoice)
    if not repeat_rows:
        return

    repeat_parts = path_parts(repeat_path)
    if len(repeat_parts) < 2 or repeat_parts[0] != "Invoice":
        raise ValueError(f"Invalid repeat path: {repeat_path}")

    container_path = "/" + "/".join(repeat_parts[:-1])
    repeat_tag = repeat_parts[-1]
    container = ensure_path(invoice, container_path)

    marker_path = repeat_path + "/"
    # dXXX ごとの各行を、対応する XML の反復要素として出力する
    for repeat_row in repeat_rows:
        repeat_elem = ET.SubElement(container, qname(repeat_tag))
        for semantic_path, xpath in section_bindings:
            _, suffix = xpath.split(marker_path, 1)
            value = query_semantic_path(rows, semantic_path, d_invoice, current_row=repeat_row)
            set_text_relative(repeat_elem, suffix, value)

この処理は,次のように考えられる。

dTaxBreakdown の各行を取り出す。
各行について cac:TaxSubtotal を作成する。
taxableAmount を cbc:TaxableAmount へ設定する。
taxAmount を cbc:TaxAmount へ設定する。
taxCategoryCode と taxRate を cac:TaxCategory へ設定する。

4.9. 請求明細をXMLへ変換する

請求明細は,dInvoiceLine に値がある行として表される。

def append_invoice_lines(invoice: ET.Element, rows: list[dict[str, str]], currency: str, d_invoice: str = "1") -> None:
    """請求明細行を XML へ展開します。現状は結合表の完全自動化対象外です。"""
    for line_row in rows_by_dimension(rows, "dInvoiceLine", d_invoice):
        line = ET.SubElement(invoice, qname("cac:InvoiceLine"))
        ET.SubElement(line, qname("cbc:ID")).text = line_row["lineID"]
        ET.SubElement(line, qname("cbc:InvoicedQuantity"), {"unitCode": line_row.get("unitCode") or "EA"}).text = line_row["invoicedQuantity"]
        ET.SubElement(line, qname("cbc:LineExtensionAmount"), {"currencyID": currency}).text = line_row["lineExtensionAmount"]

        item = ET.SubElement(line, qname("cac:Item"))
        ET.SubElement(item, qname("cbc:Name")).text = line_row["itemName"]
        if line_row.get("lineTaxCategoryCode"):
            tax_category = ET.SubElement(item, qname("cac:ClassifiedTaxCategory"))
            ET.SubElement(tax_category, qname("cbc:ID")).text = line_row["lineTaxCategoryCode"]
            ET.SubElement(tax_category, qname("cbc:Percent")).text = line_row["lineTaxRate"]
            scheme = ET.SubElement(tax_category, qname("cac:TaxScheme"))
            ET.SubElement(scheme, qname("cbc:ID")).text = "VAT"

        price = ET.SubElement(line, qname("cac:Price"))
        ET.SubElement(price, qname("cbc:PriceAmount"), {"currencyID": currency}).text = line_row["unitPriceAmount"]

構造化CSV側では,請求明細は1行で表されている。

1,,,,,1,,,,,,,,,,,,,,,,,,INV-26-861-1,商品A,S,10,10,EA,1000,10000

これが,XMLでは cac:InvoiceLinecac:Itemcac:Price などの階層要素として展開される。

5. XMLからCSVを生成する

逆方向では,XPathに対応するXML要素を探し,その値をCSV列へ戻す。

def append_header_row(rows: list[dict[str, str]], invoice: ET.Element, fieldnames: list[str], d_invoice: str = "1") -> None:
    row = new_empty_row(fieldnames, d_invoice)
    row["invoiceNumber"] = get_text(invoice, "/Invoice/cbc:ID")
    row["issueDate"] = get_text(invoice, "/Invoice/cbc:IssueDate")
    row["invoiceTypeCode"] = get_text(invoice, "/Invoice/cbc:InvoiceTypeCode")
    row["currencyCode"] = get_text(invoice, "/Invoice/cbc:DocumentCurrencyCode")
    row["totalTaxAmount"] = get_text(invoice, "/Invoice/cac:TaxTotal/cbc:TaxAmount")
    rows.append(row)


def append_party_rows(rows: list[dict[str, str]], invoice: ET.Element, fieldnames: list[str], d_invoice: str = "1") -> None:
    party_specs = [
        ("1", "Seller", "/Invoice/cac:AccountingSupplierParty/cac:Party"),
        ("2", "Buyer", "/Invoice/cac:AccountingCustomerParty/cac:Party"),
    ]
    for d_party, role, party_xpath in party_specs:
        parts = [p for p in party_xpath.split("/") if p]
        party = invoice
        for part in parts[1:]:
            party = party.find(qname(part))
            if party is None:
                break
        if party is None:
            continue
        row = new_empty_row(fieldnames, d_invoice)
        row["dInvoiceParty"] = d_party
        row["partyRole"] = role
        name = party.find(f"{qname('cac:PartyName')}/{qname('cbc:Name')}")
        tax_id = party.find(f"{qname('cac:PartyTaxScheme')}/{qname('cbc:CompanyID')}")
        row["partyName"] = name.text if name is not None and name.text else ""
        row["partyTaxID"] = tax_id.text if tax_id is not None and tax_id.text else ""
        rows.append(row)


def append_document_reference_rows(rows: list[dict[str, str]], invoice: ET.Element, fieldnames: list[str], d_invoice: str = "1") -> None:
    ref_specs = [
        ("Order", "/Invoice/cac:OrderReference/cbc:ID"),
        ("Delivery", "/Invoice/cac:DespatchDocumentReference/cbc:ID"),
    ]
    ref_no = 1
    for ref_type, ref_xpath in ref_specs:
        ref_id = get_text(invoice, ref_xpath)
        if ref_id:
            row = new_empty_row(fieldnames, d_invoice)
            row["dDocumentReference"] = str(ref_no)
            row["referenceType"] = ref_type
            row["referenceID"] = ref_id
            rows.append(row)
            ref_no += 1


def append_payment_row(rows: list[dict[str, str]], invoice: ET.Element, fieldnames: list[str], d_invoice: str = "1") -> None:
    due_date = get_text(invoice, "/Invoice/cbc:DueDate")
    payment_id = get_text(invoice, "/Invoice/cac:PaymentMeans/cbc:PaymentID")
    if due_date or payment_id:
        row = new_empty_row(fieldnames, d_invoice)
        row["dPayment"] = "1"
        row["paymentDueDate"] = due_date
        row["paymentReference"] = payment_id
        rows.append(row)


def append_repeating_rows_from_bindings(
    rows: list[dict[str, str]],
    invoice: ET.Element,
    fieldnames: list[str],
    bindings: list[Binding],
    d_invoice: str = "1",
) -> None:
    # binding 定義を見て、dXXX に対応する XML の反復階層を特定する
    repeat_defs = infer_repeat_definitions(bindings)
    for dimension, repeat_path in repeat_defs.items():
        # 同じ繰返し階層に属する binding 群を集める
        section_bindings = bindings_for_repeat_path(bindings, repeat_path)
        repeat_parts = path_parts(repeat_path)
        if len(repeat_parts) < 2 or repeat_parts[0] != "Invoice":
            continue

        parent_path = "/" + "/".join(repeat_parts[:-1])
        repeat_tag = repeat_parts[-1]
        parent_elem = invoice
        for part in path_parts(parent_path)[1:]:
            parent_elem = parent_elem.find(qname(part))
            if parent_elem is None:
                break
        if parent_elem is None:
            continue

        # XML 側の反復要素を順に読み、構造化CSVの dXXX 行へ戻す
        repeat_elems = parent_elem.findall(qname(repeat_tag))
        marker_path = repeat_path + "/"
        for i, repeat_elem in enumerate(repeat_elems, start=1):
            row = new_empty_row(fieldnames, d_invoice)
            row[dimension] = str(i)
            for semantic_path, xpath in section_bindings:
                dim_target = direct_dimension_target(semantic_path)
                if not dim_target or dim_target[0] != dimension:
                    continue
                target_field = dim_target[1]
                _, suffix = xpath.split(marker_path, 1)
                row[target_field] = get_text_relative(repeat_elem, suffix)
            rows.append(row)


def append_invoice_line_rows(rows: list[dict[str, str]], invoice: ET.Element, fieldnames: list[str], d_invoice: str = "1") -> None:
    for i, line in enumerate(invoice.findall(qname("cac:InvoiceLine")), start=1):
        row = new_empty_row(fieldnames, d_invoice)
        row["dInvoiceLine"] = str(i)
        line_id = line.find(qname("cbc:ID"))
        quantity = line.find(qname("cbc:InvoicedQuantity"))
        line_amount = line.find(qname("cbc:LineExtensionAmount"))
        item = line.find(qname("cac:Item"))
        price = line.find(qname("cac:Price"))
        row["lineID"] = line_id.text if line_id is not None and line_id.text else ""
        if quantity is not None:
            row["invoicedQuantity"] = quantity.text or ""
            row["unitCode"] = quantity.attrib.get("unitCode", "")
        row["lineExtensionAmount"] = line_amount.text if line_amount is not None and line_amount.text else ""
        if item is not None:
            name = item.find(qname("cbc:Name"))
            row["itemName"] = name.text if name is not None and name.text else ""
            tax_category = item.find(qname("cac:ClassifiedTaxCategory"))
            if tax_category is not None:
                cat_id = tax_category.find(qname("cbc:ID"))
                percent = tax_category.find(qname("cbc:Percent"))
                row["lineTaxCategoryCode"] = cat_id.text if cat_id is not None and cat_id.text else ""
                row["lineTaxRate"] = percent.text if percent is not None and percent.text else ""
        if price is not None:
            price_amount = price.find(qname("cbc:PriceAmount"))
            row["unitPriceAmount"] = price_amount.text if price_amount is not None and price_amount.text else ""
        rows.append(row)


def ubl_xml_to_csv_rows(invoice: ET.Element, fieldnames: list[str], bindings: list[Binding], d_invoice: str = "1") -> list[dict[str, str]]:
    """XML を読み、構造化CSVの行リストへ戻します。"""
    rows: list[dict[str, str]] = []
    append_header_row(rows, invoice, fieldnames, d_invoice)
    append_party_rows(rows, invoice, fieldnames, d_invoice)
    append_document_reference_rows(rows, invoice, fieldnames, d_invoice)
    append_payment_row(rows, invoice, fieldnames, d_invoice)
    append_repeating_rows_from_bindings(rows, invoice, fieldnames, bindings, d_invoice)
    append_invoice_line_rows(rows, invoice, fieldnames, d_invoice)
    return rows

この処理により,XML上のヘッダー,当事者,参照文書,支払情報,税率別内訳,請求明細が,構造化CSVの行として再構成される。

6. 出力処理とmain関数

最後に,XMLとCSVを書き出す。

def rows_to_csv(rows: list[dict[str, str]], fieldnames: list[str]) -> str:
    out = io.StringIO()
    writer = csv.DictWriter(out, fieldnames=fieldnames, lineterminator="\n")
    writer.writeheader()
    writer.writerows(rows)
    return out.getvalue()


def write_xml(elem: ET.Element, path: str | Path) -> None:
    tree = ET.ElementTree(elem)
    ET.indent(tree, space="  ")
    tree.write(path, encoding="utf-8", xml_declaration=True)

コマンドライン引数の処理は次のとおりである。

def main() -> None:
    parser = argparse.ArgumentParser(
        description="構造化CSVと簡略化した UBL Invoice XML の相互変換サンプル",
        epilog="例: py .\\syntax_binding.py .\\invoice.csv .\\bindings.csv --xml-out .\\invoice.xml --roundtrip-out .\\roundtrip.csv",
    )
    parser.add_argument("invoice_csv", type=Path, help="入力となる構造化CSVファイル")
    parser.add_argument("bindings_csv", type=Path, help="Semantic path と XPath を対応付けた binding CSV")
    parser.add_argument("--xml-out", type=Path, default=Path("invoice.xml"), help="生成する XML の出力先")
    parser.add_argument("--roundtrip-out", type=Path, default=Path("roundtrip.csv"), help="XML を読み戻して再構成した CSV の出力先")
    args = parser.parse_args()

    # 1) 入力 CSV と binding CSV を読む
    rows = read_core_csv_file(args.invoice_csv)
    bindings = read_bindings_csv_file(args.bindings_csv)
    # 2) CSV -> XML 変換
    invoice = csv_to_ubl_xml(rows, bindings)
    write_xml(invoice, args.xml_out)

    # 3) XML -> CSV の逆変換を行い、roundtrip.csv を出力する
    fieldnames = list(rows[0].keys()) if rows else []
    roundtrip_rows = ubl_xml_to_csv_rows(invoice, fieldnames, bindings)
    args.roundtrip_out.write_text(rows_to_csv(roundtrip_rows, fieldnames), encoding="utf-8")

7. 実行結果

実行すると,invoice.xmlroundtrip.csv が作成される。

python3 syntax_binding.py \
  --input-csv invoice.csv \
  --bindings-csv bindings.csv \
  --xml-out invoice.xml \
  --roundtrip-csv roundtrip.csv \
  --print-xml

生成されるXMLの一部は次のようになる。

<?xml version='1.0' encoding='utf-8'?>
<Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2" xmlns:cac="urn:oasis:names:specification:ubl:schema:xsd:CommonAggregateComponents-2" xmlns:cbc="urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2">
  <cbc:ID>INV-26-861</cbc:ID>
  <cbc:IssueDate>2026-06-20</cbc:IssueDate>
  <cbc:InvoiceTypeCode>380</cbc:InvoiceTypeCode>
  <cbc:DocumentCurrencyCode>JPY</cbc:DocumentCurrencyCode>
  <cac:AccountingSupplierParty>
    <cac:Party>
      <cac:PartyName>
        <cbc:Name>売手株式会社</cbc:Name>
      </cac:PartyName>
      <cac:PartyTaxScheme>
        <cbc:CompanyID>T1234567890123</cbc:CompanyID>
      </cac:PartyTaxScheme>
    </cac:Party>
  </cac:AccountingSupplierParty>
  <cac:AccountingCustomerParty>
    <cac:Party>
      <cac:PartyName>
        <cbc:Name>買手株式会社</cbc:Name>
      </cac:PartyName>
    </cac:Party>
  </cac:AccountingCustomerParty>
  <cac:OrderReference>
    <cbc:ID>PO-2026-0001</cbc:ID>
  </cac:OrderReference>
  <cac:DespatchDocumentReference>
    <cbc:ID>DN-2026-0001</cbc:ID>
  </cac:DespatchDocumentReference>
  <cbc:DueDate>2026-07-31</cbc:DueDate>
  <cac:PaymentMeans>
    <cbc:PaymentID>INV-2026-0001</cbc:PaymentID>
  </cac:PaymentMeans>
  <cac:TaxTotal>
    <cbc:TaxAmount currencyID="JPY">1472</cbc:TaxAmount>

XMLを読み戻して生成したCSVは次のようになる。

dInvoice,dInvoiceParty,dDocumentReference,dPayment,dTaxBreakdown,dInvoiceLine,invoiceNumber,issueDate,invoiceTypeCode,currencyCode,totalAmount,totalTaxAmount,partyRole,partyName,partyTaxID,referenceType,referenceID,paymentDueDate,paymentMeansCode,paymentReference,taxCategoryCode,taxRate,taxableAmount,taxAmount,lineID,itemName,lineTaxCategoryCode,lineTaxRate,invoicedQuantity,unitCode,unitPriceAmount,lineExtensionAmount
1,,,,,,INV-26-861,2026-06-20,380,JPY,,1472,,,,,,,,,,,,,,,,,,,,
1,1,,,,,,,,,,,Seller,売手株式会社,T1234567890123,,,,,,,,,,,,,,,,,
1,2,,,,,,,,,,,Buyer,買手株式会社,,,,,,,,,,,,,,,,,,
1,,1,,,,,,,,,,,,,Order,PO-2026-0001,,,,,,,,,,,,,,,
1,,2,,,,,,,,,,,,,Delivery,DN-2026-0001,,,,,,,,,,,,,,,
1,,,1,,,,,,,,,,,,,,2026-07-31,,INV-2026-0001,,,,,,,,,,,,
1,,,,1,,,,,,,,,,,,,,,,S,10,12320,1232,,,,,,,,
1,,,,2,,,,,,,,,,,,,,,,AA,8,3010,240,,,,,,,,
1,,,,,1,,,,,,,,,,,,,,,,,,,INV-26-861-1,商品A,S,10,10,EA,1000,10000
1,,,,,2,,,,,,,,,,,,,,,,,,,INV-26-861-2,商品B,S,10,4,EA,580,2320
1,,,,,3,,,,,,,,,,,,,,,,,,,INV-26-861-3,食品C,AA,8,7,EA,430,3010

8. プログラマ向け補足

8.1. なぜ構文結合表をCSVに出すのか

構文結合表をPythonコードに埋め込むと,対応先のXML構文が変わるたびにプログラムを修正する必要がある。

一方,bindings.csv として外部化すれば,プログラム本体は次の汎用処理に集中できる。

CSVを読み込む。
SemanticPathで値を取得する。
XPathでXML要素を作る。
値を設定する。

この方式では,別のXML構文又はJSON構文へ対応する場合にも,まず変更対象となるのは結合表である。

8.2. 限定されたSemanticPath subset

このサンプルでは,SemanticPathを意図的に限定している。

$.Invoice.field
$.Invoice.dimension[?@.filterField=filterValue].field

これは,構文結合表で実際に必要な指定だけを安全に扱うためである。汎用式をすべて許可すると,実装が複雑になり,検証も難しくなる。

8.3. 繰返し構造は別規則にしている

dTaxBreakdowndInvoiceLine は,単一の値を単一のXPathへ写すだけでは表現できない。

そのため,本サンプルでは,単純項目は bindings.csv で処理し,繰返し構造は append_tax_total()append_invoice_lines() で処理している。

実運用では,繰返し構造の生成規則も結合定義側へ外部化していくことが次の課題になる。

9. おわりに

構文結合表を用いると,変換プログラムの考え方が変わる。

従来は,変換元項目と変換先項目をプログラム中に直接書き込むことが多かった。

しかし,SemanticPath と XPath の対応を構文結合表として定義すれば,プログラム本体は,次の汎用処理に集中できる。

SemanticPathで値を取得する。
XPathでXML要素を作る。
値を設定する。
逆方向では,XPathで値を取得し,構造化CSV行へ戻す。

今回の改訂では,その第一歩として,請求書CSVと構文結合表を外部CSVとして読み込む形にした。

日本版コアインボイス(構造化CSV)は,JP PINTを置き換えるものではない。JP PINT UBL,既存EDI,ERP,会計システムの間に入り,それぞれの構文を共通の意味に結び付けるための中間的なセマンティックモデルである。

その実装イメージをつかむうえで,今回のような小さなPythonプログラムは有効である。

10. 参考:サンプルソース

"""
syntax_binding.py

構造化CSV(Japan Core Invoice の説明用サンプル)と
簡略化した UBL 2.1 Invoice XML の相互変換を行う Python スクリプトです。

設計・執筆:
  三分一信之(三分一技術士事務所)

作成日: 2026-07-02
改訂日: 2026-07-02

このスクリプトで示したいこと
1. 変換仕様をプログラム本体に埋め込むのではなく、bindings.csv に外出しできること
2. 構造化CSV側の意味位置を Semantic path で指定できること
3. XML 側の構文位置を XPath で指定できること
4. dXXX 列に値がある行を、繰返しディメンション(階層)として扱えること
5. XML → CSV の逆変換でも、同じ結合定義の考え方を使えること

このサンプルの前提
- invoice.csv は、JP PINT の完全実装データではなく、説明用の概念サンプルです。
- bindings.csv も、代表的な定義項目の抜粋であり、JP PINT 必須項目をすべて含む完全版ではありません。
- 税率別内訳(dTaxBreakdown)は、構文結合表だけで処理できるようにしています。
- 明細行(cac:InvoiceLine)は、なお Python 側の補助処理を残しています。

主なファイル
- invoice.csv       : 構造化CSVの入力データ
- bindings.csv      : Semantic path と XPath の対応表
- invoice.xml       : 生成される XML
- roundtrip.csv     : XML を読み戻して再構成した CSV

実行方法(Windows PowerShell)
1) カレントフォルダを、このスクリプトと invoice.csv / bindings.csv がある場所へ移動します。
2) そのうえで、次を実行します。

    py .\syntax_binding.py .\invoice.csv .\bindings.csv

出力先を指定する場合は、次のようにします。

    py .\syntax_binding.py .\invoice.csv .\bindings.csv `
      --xml-out .\invoice.xml `
      --roundtrip-out .\roundtrip.csv

Python Launcher が使えない場合は、python コマンドでも構いません。

    python .\syntax_binding.py .\invoice.csv .\bindings.csv

実行結果
- 指定した XML 出力先に invoice.xml が生成されます。
- 指定した CSV 出力先に roundtrip.csv が生成されます。

注意
このスクリプトは、構文結合表の考え方を説明するためのサンプルです。
本番運用では、JP PINT の完全な必須項目、コードリスト、業務規則、妥当性検証を
追加していく必要があります。
"""

# tag::imports[]
from __future__ import annotations

import argparse
import csv
import io
import re
import xml.etree.ElementTree as ET
from pathlib import Path
# end::imports[]

# 構文結合表の1行: (Semantic path, XPath)
Binding = tuple[str, str]

# tag::namespace[]
UBL_NS = "urn:oasis:names:specification:ubl:schema:xsd:Invoice-2"
CBC_NS = "urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2"
CAC_NS = "urn:oasis:names:specification:ubl:schema:xsd:CommonAggregateComponents-2"

NS = {
    "": UBL_NS,
    "cbc": CBC_NS,
    "cac": CAC_NS,
}

# XML 出力時に見やすいプレフィックス付きで名前空間を登録する
for prefix, uri in NS.items():
    ET.register_namespace(prefix, uri)


def qname(name: str) -> str:
    """cbc:ID のような名前を ElementTree 用の QName へ変換します。"""
    if ":" in name:
        prefix, local = name.split(":", 1)
        return f"{{{NS[prefix]}}}{local}"
    return f"{{{UBL_NS}}}{name}"
# end::namespace[]

# tag::read_csv[]
def read_core_csv(text: str) -> list[dict[str, str]]:
    """構造化CSV文字列を、1行1辞書のリストへ変換します。"""
    return list(csv.DictReader(io.StringIO(text)))


def read_core_csv_file(path: str | Path, encoding: str = "utf-8-sig") -> list[dict[str, str]]:
    """構造化CSVファイルを読み込みます。Excel 保存時の BOM にも対応するため utf-8-sig を使います。"""
    return read_core_csv(Path(path).read_text(encoding=encoding))
# end::read_csv[]

# tag::binding_read[]
def read_bindings_csv(text: str) -> list[Binding]:
    """bindings.csv を読み込み、(semantic_path, xpath) のリストへ変換します。"""
    bindings: list[Binding] = []
    reader = csv.DictReader(io.StringIO(text))
    if reader.fieldnames is None:
        raise ValueError("Binding CSV has no header")

    fieldnames = [name.lstrip("\ufeff") for name in reader.fieldnames]
    if "semantic_path" not in fieldnames or "xpath" not in fieldnames:
        raise ValueError("Binding CSV must have semantic_path and xpath columns")

    for line_no, row in enumerate(reader, start=2):
        normalized = {k.lstrip("\ufeff"): (v or "").strip() for k, v in row.items()}
        semantic_path = normalized.get("semantic_path", "")
        xpath = normalized.get("xpath", "")
        if not semantic_path and not xpath:
            continue
        if not semantic_path or not xpath:
            raise ValueError(f"Incomplete binding at line {line_no}: {row!r}")
        if not xpath.startswith("/Invoice/") and xpath != "/Invoice":
            raise ValueError(f"XPath must start with /Invoice at line {line_no}: {xpath!r}")
        bindings.append((semantic_path, xpath))
    return bindings


def read_bindings_csv_file(path: str | Path, encoding: str = "utf-8-sig") -> list[Binding]:
    return read_bindings_csv(Path(path).read_text(encoding=encoding))
# end::binding_read[]

# tag::xml_path[]
def normalize_xpath(xpath: str) -> str:
    """XPath 文字列の前後空白を除き、途中に空白が入っていないことを確認します。"""
    xpath = xpath.strip()
    if re.search(r"\s", xpath):
        raise ValueError(f"XPath contains whitespace: {xpath!r}")
    return xpath


def split_attr_path(xpath: str) -> tuple[str, str | None]:
    xpath = normalize_xpath(xpath)
    if "/@" in xpath:
        elem_path, attr_name = xpath.rsplit("/@", 1)
        return elem_path, attr_name
    return xpath, None


def path_parts(xpath: str) -> list[str]:
    elem_path, _ = split_attr_path(xpath)
    return [p for p in elem_path.split("/") if p]


def ensure_path(root: ET.Element, xpath: str) -> ET.Element:
    elem_path, _ = split_attr_path(xpath)
    parts = [p for p in elem_path.split("/") if p]
    if not parts or parts[0] != "Invoice":
        raise ValueError(f"XPath must start with /Invoice: {xpath}")

    elem = root
    for part in parts[1:]:
        tag = qname(part)
        child = elem.find(tag)
        if child is None:
            child = ET.SubElement(elem, tag)
        elem = child
    return elem


def ensure_relative_path(root: ET.Element, rel_xpath: str) -> ET.Element:
    elem_path, _ = split_attr_path(rel_xpath)
    parts = [p for p in elem_path.split("/") if p]
    elem = root
    for part in parts:
        tag = qname(part)
        child = elem.find(tag)
        if child is None:
            child = ET.SubElement(elem, tag)
        elem = child
    return elem


def set_text(root: ET.Element, xpath: str, value: str | None) -> None:
    if value in (None, ""):
        return
    elem_path, attr_name = split_attr_path(xpath)
    elem = ensure_path(root, elem_path)
    if attr_name:
        elem.set(attr_name, str(value))
    else:
        elem.text = str(value)


def set_text_relative(root: ET.Element, rel_xpath: str, value: str | None) -> None:
    if value in (None, ""):
        return
    elem_path, attr_name = split_attr_path(rel_xpath)
    elem = ensure_relative_path(root, elem_path)
    if attr_name:
        elem.set(attr_name, str(value))
    else:
        elem.text = str(value)


def get_text(root: ET.Element, xpath: str) -> str:
    elem_path, attr_name = split_attr_path(xpath)
    parts = [p for p in elem_path.split("/") if p]
    if not parts or parts[0] != "Invoice":
        raise ValueError(f"XPath must start with /Invoice: {xpath}")
    elem = root
    for part in parts[1:]:
        elem = elem.find(qname(part))
        if elem is None:
            return ""
    if attr_name:
        return elem.attrib.get(attr_name, "")
    return elem.text or ""


def get_text_relative(root: ET.Element, rel_xpath: str) -> str:
    elem_path, attr_name = split_attr_path(rel_xpath)
    parts = [p for p in elem_path.split("/") if p]
    elem = root
    for part in parts:
        elem = elem.find(qname(part))
        if elem is None:
            return ""
    if attr_name:
        return elem.attrib.get(attr_name, "")
    return elem.text or ""
# end::xml_path[]

# tag::semantic_path[]
def rows_by_dimension(rows: list[dict[str, str]], dimension: str, d_invoice: str = "1") -> list[dict[str, str]]:
    """dXXX に値がある行だけを集め、そのディメンション階層の行集合を返します。"""
    return [r for r in rows if r.get("dInvoice") == d_invoice and r.get(dimension)]


def first_value(rows: list[dict[str, str]], column: str, d_invoice: str = "1") -> str:
    for row in rows:
        if row.get("dInvoice") == d_invoice and row.get(column):
            return row[column]
    return ""


def direct_dimension_target(path: str) -> tuple[str, str] | None:
    """$.Invoice.dTaxBreakdown.taxAmount のような、dXXX 直下の項目定義を判定します。"""
    m = re.fullmatch(r"\$\.Invoice\.(d[A-Za-z_][A-Za-z0-9_]*)\.([A-Za-z_][A-Za-z0-9_]*)", path)
    if not m:
        return None
    return m.group(1), m.group(2)


def query_semantic_path(
    rows: list[dict[str, str]],
    path: str,
    d_invoice: str = "1",
    current_row: dict[str, str] | None = None,
) -> str:
    """
    扱う SemanticPath subset:
      const:VALUE
      $.Invoice.field
      $.Invoice.dDimension.field
      $.Invoice.dimension[?@.filterField="filterValue"].field
      $.Invoice.dimension[?@.filterField=filterValue].field
    """
    if path.startswith("const:"):
        return path.split(":", 1)[1]

    m = re.fullmatch(r"\$\.Invoice\.([A-Za-z_][A-Za-z0-9_]*)", path)
    if m:
        return first_value(rows, m.group(1), d_invoice)

    dim_target = direct_dimension_target(path)
    if dim_target:
        dimension, target_field = dim_target
        if current_row is not None and current_row.get(dimension):
            return current_row.get(target_field, "")
        repeated_rows = rows_by_dimension(rows, dimension, d_invoice)
        if repeated_rows:
            return repeated_rows[0].get(target_field, "")
        return ""

    m = re.fullmatch(
        r"\$\.Invoice\.([A-Za-z_][A-Za-z0-9_]*)"
        r"\[\?@\.([A-Za-z_][A-Za-z0-9_]*)=(?:\"([^\"]+)\"|([^\]]+))\]"
        r"\.([A-Za-z_][A-Za-z0-9_]*)",
        path,
    )
    if m:
        dimension, filter_field, quoted_value, bare_value, target_field = m.groups()
        filter_value = quoted_value if quoted_value is not None else bare_value
        for row in rows_by_dimension(rows, dimension, d_invoice):
            if row.get(filter_field) == filter_value:
                return row.get(target_field, "")
        return ""

    raise ValueError(f"Unsupported SemanticPath: {path}")
# end::semantic_path[]

def scalar_bindings(bindings: list[Binding], repeat_defs: dict[str, str]) -> list[Binding]:
    repeated_paths = tuple(repeat_defs.values())
    result: list[Binding] = []
    for semantic_path, xpath in bindings:
        elem_path, _ = split_attr_path(xpath)
        if any(elem_path.startswith(repeat_path + "/") for repeat_path in repeated_paths):
            continue
        result.append((semantic_path, xpath))
    return result


def common_path_prefix(paths: list[str]) -> str:
    split_paths = [path_parts(p) for p in paths]
    if not split_paths:
        return ""
    prefix = split_paths[0]
    for parts in split_paths[1:]:
        max_len = min(len(prefix), len(parts))
        i = 0
        while i < max_len and prefix[i] == parts[i]:
            i += 1
        prefix = prefix[:i]
        if not prefix:
            break
    return "/" + "/".join(prefix) if prefix else ""


def infer_repeat_definitions(bindings: list[Binding]) -> dict[str, str]:
    """dXXX の XPath 群から、そのディメンションが対応する繰返し XML 階層を推定します。"""
    by_dimension: dict[str, list[str]] = {}
    for semantic_path, xpath in bindings:
        dim_target = direct_dimension_target(semantic_path)
        if not dim_target:
            continue
        dimension, _ = dim_target
        by_dimension.setdefault(dimension, []).append(xpath)

    repeat_defs: dict[str, str] = {}
    for dimension, xpaths in by_dimension.items():
        repeat_path = common_path_prefix(xpaths)
        if repeat_path and repeat_path != "/Invoice":
            repeat_defs[dimension] = repeat_path
    return repeat_defs


def bindings_for_repeat_path(bindings: list[Binding], repeat_path: str) -> list[Binding]:
    result: list[Binding] = []
    for semantic_path, xpath in bindings:
        elem_path, _ = split_attr_path(xpath)
        if elem_path.startswith(repeat_path + "/"):
            result.append((semantic_path, xpath))
    return result


# tag::tax_total[]
def apply_repeating_section(
    invoice: ET.Element,
    rows: list[dict[str, str]],
    bindings: list[Binding],
    dimension: str,
    repeat_path: str,
    d_invoice: str = "1",
) -> None:
    # 同じ繰返し階層に属する binding 群を集める
    section_bindings = bindings_for_repeat_path(bindings, repeat_path)
    if not section_bindings:
        return

    repeat_rows = rows_by_dimension(rows, dimension, d_invoice)
    if not repeat_rows:
        return

    repeat_parts = path_parts(repeat_path)
    if len(repeat_parts) < 2 or repeat_parts[0] != "Invoice":
        raise ValueError(f"Invalid repeat path: {repeat_path}")

    container_path = "/" + "/".join(repeat_parts[:-1])
    repeat_tag = repeat_parts[-1]
    container = ensure_path(invoice, container_path)

    marker_path = repeat_path + "/"
    # dXXX ごとの各行を、対応する XML の反復要素として出力する
    for repeat_row in repeat_rows:
        repeat_elem = ET.SubElement(container, qname(repeat_tag))
        for semantic_path, xpath in section_bindings:
            _, suffix = xpath.split(marker_path, 1)
            value = query_semantic_path(rows, semantic_path, d_invoice, current_row=repeat_row)
            set_text_relative(repeat_elem, suffix, value)
# end::tax_total[]

# tag::csv_to_xml[]
def csv_to_ubl_xml(rows: list[dict[str, str]], bindings: list[Binding], d_invoice: str = "1") -> ET.Element:
    """構造化CSVを XML へ変換します。単一項目は binding で、明細行は補助処理で出力します。"""
    invoice = ET.Element(qname("Invoice"))
    # binding 定義を見て、dXXX に対応する XML の反復階層を特定する
    repeat_defs = infer_repeat_definitions(bindings)

    for semantic_path, xpath in scalar_bindings(bindings, repeat_defs):
        value = query_semantic_path(rows, semantic_path, d_invoice)
        set_text(invoice, xpath, value)

    for dimension, repeat_path in repeat_defs.items():
        apply_repeating_section(invoice, rows, bindings, dimension, repeat_path, d_invoice)

    # 明細行はこのサンプルでは補助処理で出力している
    append_invoice_lines(invoice, rows, first_value(rows, "currencyCode", d_invoice), d_invoice)
    return invoice
# end::csv_to_xml[]

# tag::invoice_lines[]
def append_invoice_lines(invoice: ET.Element, rows: list[dict[str, str]], currency: str, d_invoice: str = "1") -> None:
    """請求明細行を XML へ展開します。現状は結合表の完全自動化対象外です。"""
    for line_row in rows_by_dimension(rows, "dInvoiceLine", d_invoice):
        line = ET.SubElement(invoice, qname("cac:InvoiceLine"))
        ET.SubElement(line, qname("cbc:ID")).text = line_row["lineID"]
        ET.SubElement(line, qname("cbc:InvoicedQuantity"), {"unitCode": line_row.get("unitCode") or "EA"}).text = line_row["invoicedQuantity"]
        ET.SubElement(line, qname("cbc:LineExtensionAmount"), {"currencyID": currency}).text = line_row["lineExtensionAmount"]

        item = ET.SubElement(line, qname("cac:Item"))
        ET.SubElement(item, qname("cbc:Name")).text = line_row["itemName"]
        if line_row.get("lineTaxCategoryCode"):
            tax_category = ET.SubElement(item, qname("cac:ClassifiedTaxCategory"))
            ET.SubElement(tax_category, qname("cbc:ID")).text = line_row["lineTaxCategoryCode"]
            ET.SubElement(tax_category, qname("cbc:Percent")).text = line_row["lineTaxRate"]
            scheme = ET.SubElement(tax_category, qname("cac:TaxScheme"))
            ET.SubElement(scheme, qname("cbc:ID")).text = "VAT"

        price = ET.SubElement(line, qname("cac:Price"))
        ET.SubElement(price, qname("cbc:PriceAmount"), {"currencyID": currency}).text = line_row["unitPriceAmount"]
# end::invoice_lines[]

def new_empty_row(fieldnames: list[str], d_invoice: str = "1") -> dict[str, str]:
    row = {name: "" for name in fieldnames}
    row["dInvoice"] = d_invoice
    return row


# tag::xml_to_csv[]
def append_header_row(rows: list[dict[str, str]], invoice: ET.Element, fieldnames: list[str], d_invoice: str = "1") -> None:
    row = new_empty_row(fieldnames, d_invoice)
    row["invoiceNumber"] = get_text(invoice, "/Invoice/cbc:ID")
    row["issueDate"] = get_text(invoice, "/Invoice/cbc:IssueDate")
    row["invoiceTypeCode"] = get_text(invoice, "/Invoice/cbc:InvoiceTypeCode")
    row["currencyCode"] = get_text(invoice, "/Invoice/cbc:DocumentCurrencyCode")
    row["totalTaxAmount"] = get_text(invoice, "/Invoice/cac:TaxTotal/cbc:TaxAmount")
    rows.append(row)


def append_party_rows(rows: list[dict[str, str]], invoice: ET.Element, fieldnames: list[str], d_invoice: str = "1") -> None:
    party_specs = [
        ("1", "Seller", "/Invoice/cac:AccountingSupplierParty/cac:Party"),
        ("2", "Buyer", "/Invoice/cac:AccountingCustomerParty/cac:Party"),
    ]
    for d_party, role, party_xpath in party_specs:
        parts = [p for p in party_xpath.split("/") if p]
        party = invoice
        for part in parts[1:]:
            party = party.find(qname(part))
            if party is None:
                break
        if party is None:
            continue
        row = new_empty_row(fieldnames, d_invoice)
        row["dInvoiceParty"] = d_party
        row["partyRole"] = role
        name = party.find(f"{qname('cac:PartyName')}/{qname('cbc:Name')}")
        tax_id = party.find(f"{qname('cac:PartyTaxScheme')}/{qname('cbc:CompanyID')}")
        row["partyName"] = name.text if name is not None and name.text else ""
        row["partyTaxID"] = tax_id.text if tax_id is not None and tax_id.text else ""
        rows.append(row)


def append_document_reference_rows(rows: list[dict[str, str]], invoice: ET.Element, fieldnames: list[str], d_invoice: str = "1") -> None:
    ref_specs = [
        ("Order", "/Invoice/cac:OrderReference/cbc:ID"),
        ("Delivery", "/Invoice/cac:DespatchDocumentReference/cbc:ID"),
    ]
    ref_no = 1
    for ref_type, ref_xpath in ref_specs:
        ref_id = get_text(invoice, ref_xpath)
        if ref_id:
            row = new_empty_row(fieldnames, d_invoice)
            row["dDocumentReference"] = str(ref_no)
            row["referenceType"] = ref_type
            row["referenceID"] = ref_id
            rows.append(row)
            ref_no += 1


def append_payment_row(rows: list[dict[str, str]], invoice: ET.Element, fieldnames: list[str], d_invoice: str = "1") -> None:
    due_date = get_text(invoice, "/Invoice/cbc:DueDate")
    payment_id = get_text(invoice, "/Invoice/cac:PaymentMeans/cbc:PaymentID")
    if due_date or payment_id:
        row = new_empty_row(fieldnames, d_invoice)
        row["dPayment"] = "1"
        row["paymentDueDate"] = due_date
        row["paymentReference"] = payment_id
        rows.append(row)


def append_repeating_rows_from_bindings(
    rows: list[dict[str, str]],
    invoice: ET.Element,
    fieldnames: list[str],
    bindings: list[Binding],
    d_invoice: str = "1",
) -> None:
    # binding 定義を見て、dXXX に対応する XML の反復階層を特定する
    repeat_defs = infer_repeat_definitions(bindings)
    for dimension, repeat_path in repeat_defs.items():
        # 同じ繰返し階層に属する binding 群を集める
        section_bindings = bindings_for_repeat_path(bindings, repeat_path)
        repeat_parts = path_parts(repeat_path)
        if len(repeat_parts) < 2 or repeat_parts[0] != "Invoice":
            continue

        parent_path = "/" + "/".join(repeat_parts[:-1])
        repeat_tag = repeat_parts[-1]
        parent_elem = invoice
        for part in path_parts(parent_path)[1:]:
            parent_elem = parent_elem.find(qname(part))
            if parent_elem is None:
                break
        if parent_elem is None:
            continue

        # XML 側の反復要素を順に読み、構造化CSVの dXXX 行へ戻す
        repeat_elems = parent_elem.findall(qname(repeat_tag))
        marker_path = repeat_path + "/"
        for i, repeat_elem in enumerate(repeat_elems, start=1):
            row = new_empty_row(fieldnames, d_invoice)
            row[dimension] = str(i)
            for semantic_path, xpath in section_bindings:
                dim_target = direct_dimension_target(semantic_path)
                if not dim_target or dim_target[0] != dimension:
                    continue
                target_field = dim_target[1]
                _, suffix = xpath.split(marker_path, 1)
                row[target_field] = get_text_relative(repeat_elem, suffix)
            rows.append(row)


def append_invoice_line_rows(rows: list[dict[str, str]], invoice: ET.Element, fieldnames: list[str], d_invoice: str = "1") -> None:
    for i, line in enumerate(invoice.findall(qname("cac:InvoiceLine")), start=1):
        row = new_empty_row(fieldnames, d_invoice)
        row["dInvoiceLine"] = str(i)
        line_id = line.find(qname("cbc:ID"))
        quantity = line.find(qname("cbc:InvoicedQuantity"))
        line_amount = line.find(qname("cbc:LineExtensionAmount"))
        item = line.find(qname("cac:Item"))
        price = line.find(qname("cac:Price"))
        row["lineID"] = line_id.text if line_id is not None and line_id.text else ""
        if quantity is not None:
            row["invoicedQuantity"] = quantity.text or ""
            row["unitCode"] = quantity.attrib.get("unitCode", "")
        row["lineExtensionAmount"] = line_amount.text if line_amount is not None and line_amount.text else ""
        if item is not None:
            name = item.find(qname("cbc:Name"))
            row["itemName"] = name.text if name is not None and name.text else ""
            tax_category = item.find(qname("cac:ClassifiedTaxCategory"))
            if tax_category is not None:
                cat_id = tax_category.find(qname("cbc:ID"))
                percent = tax_category.find(qname("cbc:Percent"))
                row["lineTaxCategoryCode"] = cat_id.text if cat_id is not None and cat_id.text else ""
                row["lineTaxRate"] = percent.text if percent is not None and percent.text else ""
        if price is not None:
            price_amount = price.find(qname("cbc:PriceAmount"))
            row["unitPriceAmount"] = price_amount.text if price_amount is not None and price_amount.text else ""
        rows.append(row)


def ubl_xml_to_csv_rows(invoice: ET.Element, fieldnames: list[str], bindings: list[Binding], d_invoice: str = "1") -> list[dict[str, str]]:
    """XML を読み、構造化CSVの行リストへ戻します。"""
    rows: list[dict[str, str]] = []
    append_header_row(rows, invoice, fieldnames, d_invoice)
    append_party_rows(rows, invoice, fieldnames, d_invoice)
    append_document_reference_rows(rows, invoice, fieldnames, d_invoice)
    append_payment_row(rows, invoice, fieldnames, d_invoice)
    append_repeating_rows_from_bindings(rows, invoice, fieldnames, bindings, d_invoice)
    append_invoice_line_rows(rows, invoice, fieldnames, d_invoice)
    return rows
# end::xml_to_csv[]

# tag::output[]
def rows_to_csv(rows: list[dict[str, str]], fieldnames: list[str]) -> str:
    out = io.StringIO()
    writer = csv.DictWriter(out, fieldnames=fieldnames, lineterminator="\n")
    writer.writeheader()
    writer.writerows(rows)
    return out.getvalue()


def write_xml(elem: ET.Element, path: str | Path) -> None:
    tree = ET.ElementTree(elem)
    ET.indent(tree, space="  ")
    tree.write(path, encoding="utf-8", xml_declaration=True)
# end::output[]

# tag::main[]
def main() -> None:
    parser = argparse.ArgumentParser(
        description="構造化CSVと簡略化した UBL Invoice XML の相互変換サンプル",
        epilog="例: py .\\syntax_binding.py .\\invoice.csv .\\bindings.csv --xml-out .\\invoice.xml --roundtrip-out .\\roundtrip.csv",
    )
    parser.add_argument("invoice_csv", type=Path, help="入力となる構造化CSVファイル")
    parser.add_argument("bindings_csv", type=Path, help="Semantic path と XPath を対応付けた binding CSV")
    parser.add_argument("--xml-out", type=Path, default=Path("invoice.xml"), help="生成する XML の出力先")
    parser.add_argument("--roundtrip-out", type=Path, default=Path("roundtrip.csv"), help="XML を読み戻して再構成した CSV の出力先")
    args = parser.parse_args()

    # 1) 入力 CSV と binding CSV を読む
    rows = read_core_csv_file(args.invoice_csv)
    bindings = read_bindings_csv_file(args.bindings_csv)
    # 2) CSV -> XML 変換
    invoice = csv_to_ubl_xml(rows, bindings)
    write_xml(invoice, args.xml_out)

    # 3) XML -> CSV の逆変換を行い、roundtrip.csv を出力する
    fieldnames = list(rows[0].keys()) if rows else []
    roundtrip_rows = ubl_xml_to_csv_rows(invoice, fieldnames, bindings)
    args.roundtrip_out.write_text(rows_to_csv(roundtrip_rows, fieldnames), encoding="utf-8")
# end::main[]

if __name__ == "__main__":
    main()


投稿日

カテゴリー:

, , , ,

投稿者:

タグ:

コメント

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です