147 lines
6 KiB
Python
147 lines
6 KiB
Python
from typing import List, Tuple, Generator
|
|
import arpeggio
|
|
from itertools import chain
|
|
from collections.abc import Mapping
|
|
|
|
__all__ = ["js1_style", "js2_style"]
|
|
|
|
grammar = """
|
|
# starting point for jobshop1 input file
|
|
job_shop1 = skip_preface
|
|
# eat away lines of preface, until first problem_instance is
|
|
# encountered; then the list of instances start
|
|
skip_preface = (!problem_instance r"[^\n]+" skip_preface) / (eol skip_preface) / instance_list
|
|
instance_list = problem_instance (sep_line trim_ws eol problem_instance eol?)* eof_sep
|
|
problem_instance = trim_ws "instance" ' ' instance_name trim_ws eol trim_ws eol sep_line description eol problem_data
|
|
description = r"[^\n]*"
|
|
instance_name = r"\w+"
|
|
sep_line = trim_ws plus_line trim_ws eol
|
|
# lines out of multiple + signs
|
|
plus_line = r"\+\+\++"
|
|
# EOF is a builtin rule matching end of file
|
|
eof_sep = trim_ws plus_line " EOF " plus_line trim_ws eol* EOF
|
|
# entry point for jobshop2 input files
|
|
job_shop2 = problem_data EOF
|
|
problem_data = trim_ws num_jobs ' ' num_machines eol job_data+
|
|
# used for skipping arbitrary number of non-breaking whitespace
|
|
trim_ws = r'[ \t]*'
|
|
# git may change line-endings on windows, so we have to match on both
|
|
eol = "\n" / "\r\n"
|
|
nonneg_num = r'\d+'
|
|
num_jobs = nonneg_num
|
|
num_machines = nonneg_num
|
|
machine = nonneg_num
|
|
duration = nonneg_num
|
|
# task data for 1 job
|
|
job_data = ' '* machine ' '+ duration (' '+ machine ' '+ duration)* trim_ws eol
|
|
"""
|
|
|
|
|
|
class ParseError(Exception):
|
|
"""To be thrown when parsing goes wrong"""
|
|
|
|
def __init__(self, message: str) -> None:
|
|
self.message = message
|
|
|
|
|
|
class JobShopProblem(Mapping):
|
|
|
|
def __init__(
|
|
self, jobs: int, machines: int,
|
|
problem_data: List[List[Tuple[int, int]]],
|
|
name: str = 'unnamed', description: str = '') -> None:
|
|
# check plausibility of input
|
|
if len(problem_data) != jobs:
|
|
raise ParseError("Given number of jobs for problem \"{}\" differs from actual job data.".format(name))
|
|
if max([machine for (duration, machine) in chain.from_iterable(problem_data)]) > machines-1:
|
|
raise ParseError("Higher machine number found in problem data than specified at problem head")
|
|
|
|
self.description = description
|
|
self.name = name
|
|
self.machines = machines
|
|
self.jobs = jobs
|
|
self.problem_data = problem_data
|
|
|
|
def __str__(self) -> str:
|
|
return "JobShopProblem " + str(self.name)
|
|
|
|
# make this behave like a mapping (e.g. a dict)
|
|
def __getitem__(self, key: Tuple[int, int]) -> Tuple[int, int]:
|
|
try:
|
|
# unpacking key
|
|
(jobnum, tasknum) = key
|
|
return self.problem_data[jobnum][tasknum]
|
|
except ValueError as e:
|
|
raise TypeError("Key must be a (int, int) tuple")
|
|
|
|
def __iter__(self):
|
|
self.iterpos1 = 0
|
|
self.iterpos2 = 0
|
|
return self
|
|
|
|
def __next__(self) -> Tuple[int, int]:
|
|
if self.iterpos1 >= len(self.problem_data):
|
|
raise StopIteration
|
|
return_key = (self.iterpos1, self.iterpos2)
|
|
self.iterpos2 += 1
|
|
if self.iterpos2 >= len(self.problem_data[self.iterpos1]):
|
|
self.iterpos2 = 0
|
|
self.iterpos1 += 1
|
|
return return_key
|
|
|
|
|
|
def __len__(self) -> int:
|
|
return sum(map(len, self.problem_data))
|
|
|
|
def get_tasks_by_job(self, job: int) -> List[Tuple[int, int]]:
|
|
return self.problem_data[job]
|
|
|
|
class JobShopVisitor(arpeggio.PTNodeVisitor):
|
|
"""contains visitor functions needed for both jobshop1 (list of instances)
|
|
and jobshop2 (single instance without name & description) input data"""
|
|
|
|
def visit_nonneg_num(
|
|
self, node: arpeggio.ParseTreeNode,
|
|
children: arpeggio.SemanticActionResults) -> int:
|
|
if self.debug:
|
|
print("Converting non-negative integer", node.value)
|
|
return int(node.value)
|
|
|
|
def visit_machine(self, node: arpeggio.ParseTreeNode, children: arpeggio.SemanticActionResults) -> int:
|
|
return int(node.value)
|
|
|
|
def visit_duration(self, node: arpeggio.ParseTreeNode, children: arpeggio.SemanticActionResults) -> int:
|
|
return int(node.value)
|
|
|
|
def visit_num_machines(self, node: arpeggio.ParseTreeNode, children: arpeggio.SemanticActionResults) -> int:
|
|
return int(node.value)
|
|
|
|
def visit_num_jobs(self, node: arpeggio.ParseTreeNode, children: arpeggio.SemanticActionResults) -> int:
|
|
return int(node.value)
|
|
|
|
def visit_job_data(self, node: arpeggio.ParseTreeNode, children: arpeggio.SemanticActionResults) -> List[Tuple[int, int]]:
|
|
if self.debug:
|
|
print("Job data:\nnode:", type(node), "children:", children)
|
|
job_numbers = list(filter(lambda x: type(x) is int, children))
|
|
# job data needs to consist out of pairs of numbers
|
|
if len(job_numbers) % 2 == 1:
|
|
raise ParseError("Odd number of numbers in job data")
|
|
# returns list of (duration, machine) tuples
|
|
# [::2] returns only every second element of the list
|
|
return list(zip(job_numbers[1::2], job_numbers[0::2]))
|
|
|
|
def visit_problem_data(
|
|
self, node: arpeggio.ParseTreeNode,
|
|
children: arpeggio.SemanticActionResults) -> JobShopProblem:
|
|
if self.debug:
|
|
print("problem_data\nchildren:", children)
|
|
# filter out newlines or other strings
|
|
cleaned_data = list(filter(lambda x: type(x) is not str, children))
|
|
problem_data: List[List[Tuple[int, int]]] = [
|
|
cleaned_data[i] for i in range(2, len(cleaned_data))]
|
|
problem = JobShopProblem(children[0], children[1], problem_data)
|
|
if self.debug:
|
|
print("\nreturning a", type(problem), "\n")
|
|
print("problem_data:", problem_data)
|
|
return problem
|